changeset 90:2c9444152e4b

Added the dispatch thread code
author Sebastien Decugis <sdecugis@nict.go.jp>
date Mon, 07 Dec 2009 18:32:30 +0900
parents 3f8b437bcb66
children d5ba667e17c0
files freeDiameter/dispatch.c freeDiameter/main.c freeDiameter/routing.c freeDiameter/tests/testdisp.c include/freeDiameter/libfreeDiameter.h libfreeDiameter/dispatch.c libfreeDiameter/messages.c
diffstat 7 files changed, 255 insertions(+), 40 deletions(-) [+]
line wrap: on
line diff
--- a/freeDiameter/dispatch.c	Mon Dec 07 16:56:42 2009 +0900
+++ b/freeDiameter/dispatch.c	Mon Dec 07 18:32:30 2009 +0900
@@ -66,7 +66,185 @@
 }
 
 
-/* Note: if the message is for local delivery, we should test for duplicate
-  (draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */
+/* The dispatching thread(s) */
+
+enum thread_state { INITIAL = 0, RUNNING = 1, TERMINATED = 2 };
+static void cleanup_state(void * state_loc)
+{
+	if (state_loc)
+		*(enum thread_state *)state_loc = TERMINATED;
+}
+
+static pthread_mutex_t order_lock = PTHREAD_MUTEX_INITIALIZER;
+static enum { RUN = 0, STOP = 1 } order_val;
+
+static void * dispatch_thread(void * arg)
+{
+	TRACE_ENTRY("%p", arg);
+	
+	/* Set the thread name */
+	{
+		char buf[48];
+		snprintf(buf, sizeof(buf), "Dispatch %p", arg);
+		fd_log_threadname ( buf );
+	}
 
+	pthread_cleanup_push( cleanup_state, arg );
+	
+	/* Mark the thread running */
+	*(enum thread_state *)arg = RUNNING;
+	
+	do {
+		struct msg * msg;
+		struct msg_hdr * hdr;
+		int is_req = 0;
+		struct session * sess;
+		enum disp_action action;
+		const char * ec = NULL;
+		const char * em = NULL;
+		
+		/* Test the environment */
+		{
+			int must_stop;
+			CHECK_POSIX_DO( pthread_mutex_lock(&order_lock), goto end ); /* we lock to flush the caches */
+			must_stop = (order_val == STOP);
+			CHECK_POSIX_DO( pthread_mutex_unlock(&order_lock), goto end );
+			if (must_stop)
+				goto end;
+			
+			pthread_testcancel();
+		}
+		
+		/* Ok, we are allowed to run */
+		
+		/* Get the next message from the queue */
+		CHECK_FCT_DO( fd_fifo_get ( fd_g_local, &msg ), goto fatal_error );
+		
+		if (TRACE_BOOL(FULL)) {
+			TRACE_DEBUG(FULL, "Picked next message:");
+			fd_msg_dump_one(FULL, msg);
+		}
+		
+		/* Read the message header */
+		CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto fatal_error );
+		is_req = hdr->msg_flags & CMD_FLAG_REQUEST;
+		
+		/* Note: if the message is for local delivery, we should test for duplicate
+		  (draft-asveren-dime-dupcons-00). This may conflict with path validation decisions, no clear answer yet */
+		
+		/* At this point, we probably need to understand the message content, so parse the message */
+		CHECK_FCT_DO( fd_msg_parse_dict ( msg, fd_g_config->cnf_dict ), /* Ignore error */);
+		
+		/* First, if the original request was registered with a callback and we receive the answer, call it. */
+		if ( ! is_req ) {
+			struct msg * qry;
+			void (*anscb)(void *, struct msg **) = NULL;
+			void * data = NULL;
+			
+			/* Retrieve the corresponding query */
+			CHECK_FCT_DO( fd_msg_answ_getq( msg, &qry ), goto fatal_error );
+			
+			/* Retrieve any registered handler */
+			CHECK_FCT_DO( fd_msg_anscb_get( qry, &anscb, &data ), goto fatal_error );
+			
+			/* If a callback was registered, pass the message to it */
+			if (anscb != NULL) {
+				
+				TRACE_DEBUG(FULL, "Calling callback registered when query was sent (%p, %p)", anscb, data);
+				(*anscb)(data, &msg);
+				
+				if (msg == NULL) {
+					/* Ok, the message is now handled, we can skip to the next one */
+					continue;
+				}
+			}
+		}
+		
+		/* Retrieve the session of the message */
+		CHECK_FCT_DO( fd_msg_sess_get(fd_g_config->cnf_dict, msg, &sess, NULL), goto fatal_error );
+		
+		/* Now, call any callback registered for the message */
+		CHECK_FCT_DO( fd_msg_dispatch ( &msg, sess, &action, &ec), goto fatal_error );
+		
+		/* Now, act depending on msg and action and ec */
+		if (!msg)
+			continue;
+		
+		switch ( action ) {
+			case DISP_ACT_CONT:
+				/* No callback has handled the message, let's reply with a generic error */
+				em = "The message was not handled by any extension callback";
+				ec = "DIAMETER_COMMAND_UNSUPPORTED";
+			
+			case DISP_ACT_ERROR:
+				/* We have a problem with delivering the message */
+				if (ec == NULL) {
+					ec = "DIAMETER_UNABLE_TO_COMPLY";
+				}
+				
+				if (!is_req) {
+					TRACE_DEBUG(INFO, "Received an answer to a localy issued query, but no handler processed this answer!");
+					fd_msg_dump_walk(INFO, msg);
+					fd_msg_free(msg);
+					msg = NULL;
+					break;
+				}
+				
+				/* Create an answer with the error code and message */
+				CHECK_FCT_DO( fd_msg_new_answer_from_req ( fd_g_config->cnf_dict, &msg, MSGFL_ANSW_ERROR ), goto fatal_error );
+				CHECK_FCT_DO( fd_msg_rescode_set(msg, (char *)ec, (char *)em, NULL, 1 ), goto fatal_error );
+				
+			case DISP_ACT_SEND:
+				/* Now, send the message */
+				CHECK_FCT_DO( fd_fifo_post(fd_g_outgoing, &msg), goto fatal_error );
+		}
+		
+		/* We're done with this message */
+	
+	} while (1);
+	
+fatal_error:
+	TRACE_DEBUG(INFO, "An error occurred in dispatch module! Thread is terminating...");
+	CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
+	
+end:	
+	/* Mark the thread as terminated */
+	pthread_cleanup_pop(1);
+	return NULL;
+}
 
+/* Later (same as routing): TODO("Set threshold on local queue"); */
+static pthread_t my_dispatch = (pthread_t)NULL;
+static enum thread_state my_state = INITIAL;
+
+/* Initialize the Dispatch module */
+int fd_disp_init(void)
+{
+	order_val = RUN;
+	
+	CHECK_POSIX( pthread_create( &my_dispatch, NULL, dispatch_thread, &my_state ) );
+	
+	return 0;
+}
+
+int fd_disp_cleanstop(void)
+{
+	CHECK_POSIX( pthread_mutex_lock(&order_lock) );
+	order_val = STOP;
+	CHECK_POSIX( pthread_mutex_unlock(&order_lock) );
+
+	return 0;
+}
+
+int fd_disp_fini(void)
+{
+	/* Wait for a few milliseconds for the thread to complete, by monitoring my_state */
+
+	/* Then if needed, cancel the thread */
+	
+	/* Remove all remaining handlers */
+	fd_disp_unregister_all();
+	
+	return ENOTSUP;
+}
+
--- a/freeDiameter/main.c	Mon Dec 07 16:56:42 2009 +0900
+++ b/freeDiameter/main.c	Mon Dec 07 18:32:30 2009 +0900
@@ -99,6 +99,7 @@
 	CHECK_FCT(  fd_queues_init()  );
 	CHECK_FCT(  fd_msg_init()  );
 	CHECK_FCT(  fd_p_expi_init()  );
+	CHECK_FCT(  fd_disp_init()  );
 	CHECK_FCT(  fd_rt_init()  );
 	
 	/* Parse the configuration file */
@@ -161,9 +162,11 @@
 	
 	/* cleanups */
 	CHECK_FCT_DO( fd_servers_stop(), /* Stop accepting new connections */ );
-	TODO("Stop dispatch thread(s) properly (no cancel yet)");
+	CHECK_FCT_DO( fd_disp_cleanstop(), /* Stop dispatch thread(s) properly (no cancel yet) */ );
 	CHECK_FCT_DO( fd_peer_fini(), /* Stop all connections */ );
-	TODO("Stop dispatch & routing threads");
+	CHECK_FCT_DO( fd_rt_fini(), /* Stop routing threads */ );
+	CHECK_FCT_DO( fd_disp_fini(), /* Stop dispatch thread */ );
+	
 	CHECK_FCT_DO( fd_ext_fini(), /* Cleaup all extensions */ );
 	TODO("Cleanup queues (dump all remaining messages ?)");
 	
--- a/freeDiameter/routing.c	Mon Dec 07 16:56:42 2009 +0900
+++ b/freeDiameter/routing.c	Mon Dec 07 18:32:30 2009 +0900
@@ -862,6 +862,14 @@
 {
 	CHECK_FCT_DO( fd_thr_term(&rt_in ), /* continue */);
 	CHECK_FCT_DO( fd_thr_term(&rt_out), /* continue */);
+	
+	/* Cleanup all remaining handlers */
+	while (!FD_IS_LIST_EMPTY(&rt_fwd_list)) {
+		CHECK_FCT_DO( fd_rt_fwd_unregister ( (void *)rt_fwd_list.next, NULL ), /* continue */ );
+	}
+	while (!FD_IS_LIST_EMPTY(&rt_out_list)) {
+		CHECK_FCT_DO( fd_rt_out_unregister ( (void *)rt_out_list.next, NULL ), /* continue */ );
+	}
 	return 0;
 }
 
--- a/freeDiameter/tests/testdisp.c	Mon Dec 07 16:56:42 2009 +0900
+++ b/freeDiameter/tests/testdisp.c	Mon Dec 07 18:32:30 2009 +0900
@@ -105,6 +105,7 @@
 	enum disp_action action;
 	struct disp_hdl * hdl[NB_CB];
 	struct disp_when when;
+	const char * ec;
 	
 	/* First, initialize the daemon modules */
 	INIT_FD();
@@ -149,7 +150,7 @@
 		/* Check this handler is called for a message */
 		msg = new_msg( 0, cmd1, avp1, NULL, 0 );
 		memset(cbcalled, 0, sizeof(cbcalled));
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( DISP_ACT_CONT, action );
 		
@@ -173,7 +174,7 @@
 		/* Check the callbacks are called as appropriate */
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 0, cmd1, avp1, NULL, 0 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 0, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -184,7 +185,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 1, cmd1, avp1, NULL, 0 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -195,7 +196,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 2, cmd1, avp1, NULL, 0 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 0, cbcalled[1] );
 		CHECK( 1, cbcalled[2] );
@@ -229,7 +230,7 @@
 		/* Check the callbacks are called as appropriate */
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 0, cmd1, avp1, NULL, 0 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -239,7 +240,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 2, cmd1, avp1, NULL, 0 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 1, cbcalled[2] );
@@ -249,7 +250,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 2, cmd2, avp1, NULL, 0 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 0, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -259,7 +260,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 1, cmd2, NULL, avp2, 0 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 0, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -309,7 +310,7 @@
 		/* Check the callbacks are called as appropriate */
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 0, cmd1, NULL, NULL, 0 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 0, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -321,7 +322,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 0, cmd1, avp1, NULL, 0 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 1, cbcalled[2] );
@@ -333,7 +334,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 1, cmd2, avp1, NULL, 0 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -345,7 +346,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 1, cmd1, avp1, NULL, 0 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 1, cbcalled[2] );
@@ -357,7 +358,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 1, cmd1, avp1, avp2, 0 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 1, cbcalled[2] );
@@ -370,7 +371,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 1, cmd1, NULL, avp2, 1 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 0, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -383,7 +384,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 1, cmd1, NULL, avp2, 2 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 0, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -430,7 +431,7 @@
 		/* Check the callbacks are called as appropriate */
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 0, cmd1, avp1, NULL, 0 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 0, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -440,7 +441,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 1, cmd2, avp1, avp2, 0 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 0, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -450,7 +451,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 1, cmd2, avp1, avp2, 1 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -460,7 +461,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 1, cmd2, avp1, avp2, 2 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 0, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -470,7 +471,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 1, cmd1, avp1, avp2, 1 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 1, cbcalled[2] );
@@ -488,7 +489,7 @@
 			CHECK( 0, fd_msg_avp_setvalue ( avp, &value ) );
 			CHECK( 0, fd_msg_avp_add ( msg, MSG_BRW_LAST_CHILD, avp ) );
 		}
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -512,7 +513,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 1, cmd1, avp1, avp2, 1 );
-		CHECK( 12345, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 12345, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 1, cbcalled[6] );
@@ -534,7 +535,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 1, cmd1, avp1, avp2, 1 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 1, cbcalled[8] );
@@ -556,7 +557,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 1, cmd1, avp1, avp2, 1 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 1, cbcalled[9] );
@@ -587,7 +588,7 @@
 		
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 2, cmd2, avp1, avp2, 2 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 1, cbcalled[2] );
@@ -599,7 +600,7 @@
 		CHECK( 0, fd_disp_register( cb_9, DISP_HOW_ANY, &when, &hdl[5] ) );
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 2, cmd2, avp1, avp2, 2 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 0, cbcalled[1] );
 		CHECK( 0, cbcalled[2] );
@@ -612,7 +613,7 @@
 		CHECK( 0, fd_disp_register( cb_9, DISP_HOW_AVP_ENUMVAL, &when, &hdl[5] ) );
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 2, cmd2, avp1, avp2, 2 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 1, cbcalled[2] );
@@ -625,7 +626,7 @@
 		CHECK( 0, fd_disp_register( cb_9, DISP_HOW_AVP, &when, &hdl[5] ) );
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 2, cmd2, avp1, avp2, 2 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 1, cbcalled[2] );
@@ -638,7 +639,7 @@
 		CHECK( 0, fd_disp_register( cb_9, DISP_HOW_CC, &when, &hdl[5] ) );
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 2, cmd2, avp1, avp2, 2 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 1, cbcalled[2] );
@@ -651,7 +652,7 @@
 		CHECK( 0, fd_disp_register( cb_9, DISP_HOW_APPID, &when, &hdl[5] ) );
 		memset(cbcalled, 0, sizeof(cbcalled));
 		msg = new_msg( 2, cmd2, avp1, avp2, 2 );
-		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action ) );
+		CHECK( 0, fd_msg_dispatch ( &msg, sess, &action, &ec ) );
 		CHECK( 1, cbcalled[0] );
 		CHECK( 1, cbcalled[1] );
 		CHECK( 1, cbcalled[2] );
--- a/include/freeDiameter/libfreeDiameter.h	Mon Dec 07 16:56:42 2009 +0900
+++ b/include/freeDiameter/libfreeDiameter.h	Mon Dec 07 18:32:30 2009 +0900
@@ -2352,7 +2352,8 @@
 
 enum disp_action {
 	DISP_ACT_CONT,	/* The next handler should be called, unless *msg == NULL. */
-	DISP_ACT_SEND	/* The updated message must be sent. No further callback is called. */
+	DISP_ACT_SEND,	/* The updated message must be sent. No further callback is called. */
+	DISP_ACT_ERROR	/* An error must be created and sent as a reply -- not valid for callbacks, only for fd_msg_dispatch. */
 };
 /* The callbacks that are registered have the following prototype:
  *  	int dispatch_callback( struct msg ** msg, struct avp * avp, struct session * session, enum disp_action * action );
@@ -2423,6 +2424,9 @@
  */
 int fd_disp_unregister ( struct disp_hdl ** handle );
 
+/* Destroy all handlers */
+void fd_disp_unregister_all ( void );
+
 /*
  * FUNCTION:	fd_msg_dispatch
  *
@@ -2442,7 +2446,7 @@
  *  EINVAL 	: A parameter is invalid.
  *  (other errors)
  */
-int fd_msg_dispatch ( struct msg ** msg, struct session * session, enum disp_action *action );
+int fd_msg_dispatch ( struct msg ** msg, struct session * session, enum disp_action *action, const char ** error_code );
 
 
 
--- a/libfreeDiameter/dispatch.c	Mon Dec 07 16:56:42 2009 +0900
+++ b/libfreeDiameter/dispatch.c	Mon Dec 07 18:32:30 2009 +0900
@@ -199,3 +199,12 @@
 	return 0;
 }
 
+/* Delete all handlers */
+void fd_disp_unregister_all ( void )
+{
+	TRACE_ENTRY("");
+	while (!FD_IS_LIST_EMPTY(&all_handlers)) {
+		CHECK_FCT_DO( fd_disp_unregister(all_handlers.next->o), /* continue */ );
+	}
+	return;
+}
--- a/libfreeDiameter/messages.c	Mon Dec 07 16:56:42 2009 +0900
+++ b/libfreeDiameter/messages.c	Mon Dec 07 18:32:30 2009 +0900
@@ -2232,7 +2232,7 @@
 		goto no_error;
 
 /* Call all dispatch callbacks for a given message */
-int fd_msg_dispatch ( struct msg ** msg, struct session * session, enum disp_action *action)
+int fd_msg_dispatch ( struct msg ** msg, struct session * session, enum disp_action *action, const char ** error_code)
 {
 	struct dictionary  * dict;
 	struct dict_object * app;
@@ -2241,9 +2241,12 @@
 	struct fd_list * cb_list;
 	int ret = 0;
 	
-	TRACE_ENTRY("%p %p %p", msg, session, action);
+	TRACE_ENTRY("%p %p %p %p", msg, session, action, error_code);
 	CHECK_PARAMS( msg && CHECK_MSG(*msg) && action);
 	
+	if (error_code)
+		*error_code = NULL;
+	
 	/* Take the dispatch lock */
 	CHECK_FCT( pthread_rwlock_rdlock(&fd_disp_lock) );
 	pthread_cleanup_push( fd_cleanup_rwlock, &fd_disp_lock );
@@ -2261,8 +2264,17 @@
 	CHECK_FCT_DO( ret = fd_dict_search( dict, DICT_APPLICATION, APPLICATION_BY_ID, &(*msg)->msg_public.msg_appl, &app, 0 ), goto error );
 	
 	if (app == NULL) {
-		/* In that case, maybe we should answer a DIAMETER_APPLICATION_UNSUPPORTED error ? Do we do this here ? */
-		TODO("Reply DIAMETER_APPLICATION_UNSUPPORTED if it's a request ?");
+		if ((*msg)->msg_public.msg_flags & CMD_FLAG_REQUEST) {
+			if (error_code)
+				*error_code = "DIAMETER_APPLICATION_UNSUPPORTED";
+			*action = DISP_ACT_ERROR;
+		} else {
+			TRACE_DEBUG(INFO, "Received an answer to a local query with an unsupported application %d, discarding...",  (*msg)->msg_public.msg_appl);
+			fd_msg_dump_walk(INFO, *msg);
+			fd_msg_free(*msg);
+			*msg = NULL;
+		}
+		goto no_error;
 	}
 	
 	/* So start browsing the message */
"Welcome to our mercurial repository"