view waaad/dispatch.c @ 421:fcd65ebc7c43

Parse incoming answers
author Sebastien Decugis <sdecugis@nict.go.jp>
date Tue, 23 Jun 2009 16:09:21 +0900
parents 316bb3f38d04
children 5e793a4e2450
line wrap: on
line source

/*********************************************************************************************************
* Software License Agreement (BSD License)                                                               *
* Author: Sebastien Decugis <sdecugis@nict.go.jp>							 *
*													 *
* Copyright (c) 2009, WIDE Project and NICT								 *
* All rights reserved.											 *
* 													 *
* Redistribution and use of this software in source and binary forms, with or without modification, are  *
* permitted provided that the following conditions are met:						 *
* 													 *
* * Redistributions of source code must retain the above 						 *
*   copyright notice, this list of conditions and the 							 *
*   following disclaimer.										 *
*    													 *
* * Redistributions in binary form must reproduce the above 						 *
*   copyright notice, this list of conditions and the 							 *
*   following disclaimer in the documentation and/or other						 *
*   materials provided with the distribution.								 *
* 													 *
* * Neither the name of the WIDE Project or NICT nor the 						 *
*   names of its contributors may be used to endorse or 						 *
*   promote products derived from this software without 						 *
*   specific prior written permission of WIDE Project and 						 *
*   NICT.												 *
* 													 *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 	 *
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 	 *
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.								 *
*********************************************************************************************************/

/* Dispatch module.
 * 
 * See dispatch.h and dispatch-api.h for more information on the functions and types involved.
 */

#include "waaad-internal.h"

/* Read-write lock for dictionary callbacks. Note that if the dictionary object is destroyed, this lock is not taken. */
static pthread_rwlock_t	disp_dictlock;

/* Keep the list of all registered handlers */
static uti_list_t all_handlers;

/* List of handlers registered for ANY. Others are stored in the dictionary */
static uti_list_t any_handlers;

/* Thread for dispatch module */
static pthread_t th = (pthread_t) NULL;

/* A callback handler, internal description */
typedef struct {
	uint32_t	eyec;	/* an eye catcher for debug */
	uti_list_t	all;	/* link in the all_handlers list. "o" is the head of this object */
	uti_list_t	dict;	/* link in the dictionary cb_list. "o" is the head of this object */
	disp_reg_t	how;	/* the "how" registration parameter */
	disp_reg_val_t	when;	/* the "when" registration parameter */
	disp_cb_t	cb;	/* pointer to the callback function */
} _disp_cb_hdl_t;

#define DISP_EYEC	0xD15241C1

#define _H( _ptr ) 			\
	((_disp_cb_hdl_t *)( _ptr ))

#define CHECK_HDL( _hdl ) 		\
	( ((_hdl) != NULL) && (_H(_hdl)->eyec == DISP_EYEC) )

/**************************************************************************************/

static int parse_or_error(msg_t ** msg, dict_object_t ** app) 
{
	int ret = 0;
	msg_data_t * hdr;
	dict_object_t * rule = NULL;
	
	TRACE_DEBUG(INFO, "Not supported: create appropriate error message(s) if needed");
	
	CHECK_FCT(  msg_data(*msg, &hdr)  );
	
	/* Now we search the application of the message */
	CHECK_FCT( dict_search( DICT_APPLICATION, APPLICATION_BY_ID_REF, &hdr->msg_appl, app, 0 ) );
	if (*app == NULL) {
		/* In case of error, we should answer a DIAMETER_APPLICATION_UNSUPPORTED error */
		TRACE_DEBUG(INFO, "Error: TODO reply DIAMETER_APPLICATION_UNSUPPORTED if it's a request");
		goto end;
	}
		
	CHECK_FCT_DO( ret = msg_parse_rules( *msg, &rule ), /* handle error later, but log here */ );
	
	if (ret == 0)
		return 0;
	
	if (ret != EBADMSG)
		goto end;
	
	/* TO DO: handle the case where the function returns an error, create the
	 appropriate error message if the *msg was a request */
	TRACE_DEBUG(INFO, "A message failed the dictionary check; discarded");
	
	msg_dump_walk(INFO, *msg);

end:
	CHECK_FCT( msg_free(*msg, 1) );
	*msg = NULL;
	return ret;
}

/**************************************************************************************/

#define loc_CALL_CB( _cb, _pmsg, _avp ) {								\
	TRACE_DEBUG(ANNOYING, "Calling dispatch callback...");						\
	CHECK_FCT_DO((*(_cb))( (_pmsg), (_avp) ), goto c_continue);					\
	TRACE_DEBUG(ANNOYING, "Dispatch callback returned, %p", *(_pmsg));				\
	if (*(_pmsg) == NULL)										\
		goto c_continue;									\
}
		

/* Dispatch thread */
static void * disp_th(void * arg)
{
	/* Pick message in the queue and handle it */
	THREAD_NAME( "Dispatch" );
	
	do {
		msg_t * msg = NULL;
		msg_avp_t * avp = NULL;
		msg_data_t * hdr;
		int error = 0;
		uti_list_t * li = NULL;
		uti_list_t * sem = NULL;
		_disp_cb_hdl_t * hdl = NULL;
		dict_object_t * msg_app = NULL;
		dict_object_t * msg_cmd = NULL;
				
		pthread_testcancel();
		
		/* pick next message in global queue for local delivery */
		CHECK_FCT_DO(  meq_get( g_meq_local, &msg ),  goto error );
		
		/* Retrieve some flag values */
		CHECK_FCT_DO(  msg_data(msg, &hdr), goto error  );
		
		/* First, if the original request was registered with a callback and we receive the answer, call it. */
		if ((hdr->msg_flags & CMD_FLAG_REQUEST) == 0 ) {
			msg_t * qry = NULL;
			void (*anscb)(void *, msg_t **) = NULL;
			void * data = NULL;
			
			CHECK_FCT_DO(  msg_answ_getq( msg, &qry ), goto error  );
			ASSERT(qry != NULL);
			if (qry == NULL)
				goto error;
			
			CHECK_FCT_DO(  msg_get_anscb(qry, &anscb, &data ), goto error  );
			
			/* If a callback was registered, pass the message to it */
			if (anscb != NULL) {
				(void)msg_parse_dict(msg); /* This may help the callback function */
				
				TRACE_DEBUG(FULL, "Calling callback registered when query was sent (%p, %p)", anscb, data);
				(*anscb)(data, &msg);
				
				if (msg == NULL) {
					TRACE_DEBUG(FULL, "The message was handled by msg_send callback, skipping dispatch module callbacks");
					continue;
				}
			}
		}
		
		/* Now parse the message against the dictionary and rules */
		CHECK_FCT_DO(  parse_or_error(&msg, &msg_app), goto error  );
		if (msg == NULL)
			continue;
		
		/* Now we'll call the dispatch handlers, lock the appropriate rdlock */
		CHECK_POSIX_DO( pthread_rwlock_rdlock(&disp_dictlock), goto error );
		pthread_cleanup_push( cleanup_rwlock, &disp_dictlock );
		
		/* Start with calling all ANY callbacks */
		for (li = any_handlers.next; (li != &any_handlers); li = li->next) {
			hdl = _H(li->o);
			loc_CALL_CB( hdl->cb, &msg, NULL );
		}
		
		/* Now get the reference to the message dict object */
		CHECK_FCT_DO(  msg_model( msg, &msg_cmd ), goto c_error  );
		
		/* Now we'll parse the message to search for AVPs callbacks */
		CHECK_FCT_DO(  msg_browse( msg, MSG_BRW_FIRST_CHILD, &avp, NULL ), goto c_error  );
		
		while (avp != NULL) {
			dict_object_t * avp_model = NULL;
			
			CHECK_FCT_DO(  msg_model( avp, &avp_model ), goto c_error  );
			if (avp_model) {
				dict_object_t * avp_type = NULL;
				dict_object_t * avp_constant = NULL;
				
				/* Find the list of callbacks for this AVP */
				CHECK_FCT_DO(  dict_disp_cb( DICT_AVP, avp_model, &sem ), goto c_error  );
				
				/* Check if the AVP has a constant value */
				CHECK_FCT_DO(  dict_search(DICT_TYPE, TYPE_OF_AVP, avp_model, &avp_type, 0), goto c_error  );
				if (avp_type) {
					dict_type_enum_request_t  request;
					msg_avp_data_t *avp_data;
					
					memset(&request, 0, sizeof(request));
					request.type_obj = avp_type;
					CHECK_FCT_DO(  msg_avp_data( avp, &avp_data ), goto c_error  );
					ASSERT( avp_data );
					memcpy(&request.search.enum_value, avp_data->avp_data, sizeof(avp_value_t));
					CHECK_FCT_DO( dict_search( DICT_TYPE_ENUM, ENUM_BY_STRUCT, &request, &avp_constant, 0), goto c_error  );
				}
				
				/* Call all matching callbacks */
				for (li = sem->next; li != sem; li = li->next) {
					hdl = _H(li->o);
					
					ASSERT( hdl->when.avp == avp_model );
					
					if ((hdl->how == DISP_REG_AVP_ENUMVAL) && (hdl->when.value != avp_constant))
						continue;
					
					if ((hdl->when.command != NULL) && (hdl->when.command != msg_cmd))
						continue;
					
					if ((hdl->when.app_id != NULL) && (hdl->when.app_id != msg_app))
						continue;
					
					/* Ok, this callback should be called */
					loc_CALL_CB( hdl->cb, &msg, avp );
				}
			}
			/* Go to next AVP */
			CHECK_FCT_DO(  msg_browse( avp, MSG_BRW_WALK, &avp, NULL ), goto c_error  );
		}
			
		/* Now call the callbacks registered for the command */
		CHECK_FCT_DO(  dict_disp_cb( DICT_COMMAND, msg_cmd, &sem ), goto c_error  );
		for (li = sem->next; li != sem; li = li->next) {
			hdl = _H(li->o);

			ASSERT( hdl->when.command == msg_cmd );

			if ((hdl->when.app_id != NULL) && (hdl->when.app_id != msg_app))
				continue;

			/* Ok, this callback should be called */
			loc_CALL_CB( hdl->cb, &msg, NULL );
		}
		
		/* Finally call the application's callbacks */
		CHECK_FCT_DO(  dict_disp_cb( DICT_APPLICATION, msg_app, &sem ), goto c_error  );
		for (li = sem->next; li != sem; li = li->next) {
			hdl = _H(li->o);

			ASSERT( hdl->when.app_id == msg_app );

			loc_CALL_CB( hdl->cb, &msg, NULL );
		}
		
		/* If we arrive here, it means no callback has set msg to NULL... */
		TRACE_DEBUG(INFO, "Error: the message was not handled in dispatch module...TODO");
			
		/* TODO: request => forward or return error? answer => discard */
	
		
		goto c_continue;
c_error:
		error = 1;
c_continue:
		/* some compilers complain if there is no instruction here... => */ ;
		pthread_cleanup_pop( 0 );
		CHECK_POSIX_DO( pthread_rwlock_unlock(&disp_dictlock), goto error );
		
		if (msg) {
			CHECK_FCT_DO( msg_free(msg, 1), goto error );
		}
		
		if (error)
			goto error;
		
	} while (1);	
error:
	/* An error occured */	
	CHECK_FCT_DO(  uti_event_send(WDE_ERROR),  /* nothing */  );
	TRACE_DEBUG(INFO, "Thread terminated");
	return NULL;
}

/**************************************************************************************/
/* Initialize the module */
int disp_init ( void )
{
	TRACE_ENTRY("");
	
	uti_list_init(&all_handlers, NULL);
	uti_list_init(&any_handlers, NULL);
	CHECK_POSIX( pthread_rwlock_init(&disp_dictlock, NULL) );
	
	CHECK_POSIX(  pthread_create( &th, NULL, disp_th, NULL )  );
	
	return 0;
}

/* End of the module */
int disp_fini ( void )
{
	TRACE_ENTRY("");
	
	if (th != (pthread_t)NULL) {
		CHECK_FCT_DO(  _thread_term(&th), /* continue */ );
	}
	
	while (!IS_LIST_EMPTY(&all_handlers)) {
		disp_unregister((disp_cb_hdl_t *)(all_handlers.next));
	}
	
	CHECK_POSIX_DO( pthread_rwlock_destroy(&disp_dictlock), /* continue */ );
	
	return 0;
}

/* Register a new callback. */
int disp_register ( disp_cb_t cb, disp_reg_t how, disp_reg_val_t * when, disp_cb_hdl_t ** handle )
{
	_disp_cb_hdl_t * hdl = NULL;
	uti_list_t * where = NULL;
	
	TRACE_ENTRY("%p %d %p %p", cb, how, when, handle);
	
	CHECK_PARAMS( cb );
	CHECK_PARAMS( (how >= DISP_REG_ANY) && (how <= DISP_REG_AVP_ENUMVAL) );
	CHECK_PARAMS( (how == DISP_REG_ANY) || (when != NULL) );
	CHECK_PARAMS( (how != DISP_REG_APPID) || (when->flags & 0x3) ); 
	
	switch (how) {
		case DISP_REG_ANY:
			where = &any_handlers;
			break;
	
		case DISP_REG_APPID:
			CHECK_FCT( dict_disp_cb(DICT_APPLICATION, when->app_id, &where) );
			break;
			
		case DISP_REG_CC:
			CHECK_FCT( dict_disp_cb(DICT_COMMAND, when->command, &where) );
			break;
			
		case DISP_REG_AVP:
		case DISP_REG_AVP_ENUMVAL:
			CHECK_FCT( dict_disp_cb(DICT_AVP, when->avp, &where) );
			break;
	}
	
	CHECK_MALLOC( hdl = (_disp_cb_hdl_t *) malloc(sizeof(_disp_cb_hdl_t)) );
	memset(hdl, 0, sizeof(_disp_cb_hdl_t));
	
	hdl->eyec = DISP_EYEC;
	uti_list_init(&hdl->all, hdl);
	uti_list_init(&hdl->dict, hdl);
	hdl->how = how;
	if (when)
		memcpy(&hdl->when, when, sizeof(disp_reg_val_t));
	hdl->cb = cb;
	
	CHECK_POSIX( pthread_rwlock_wrlock(&disp_dictlock) );
	
	uti_list_insert_before(&all_handlers, &hdl->all);
	uti_list_insert_before(where, &hdl->dict);
	
	CHECK_POSIX( pthread_rwlock_unlock(&disp_dictlock) );
	
	if (handle)
		*handle = (disp_cb_hdl_t *) hdl;
	
	/* In addition, advertize the support for application in CER/CEA if appropriate */
	if (how == DISP_REG_APPID) {
		int i = 0;
		int add_auth;
		int add_acct;
		dict_application_data_t appl_data;
		dict_object_t * vendor = NULL;
		dict_vendor_data_t vendor_data;
		
		add_auth = (when->flags & 0x1) ? 1 : 0;
		add_acct = (when->flags & 0x2) ? 1 : 0;
		
		/* Get the app id of the application being added */
		CHECK_FCT( dict_getval( when->app_id, &appl_data) );
		
		/* Get the vendor and data */
		CHECK_FCT( dict_search( DICT_VENDOR, VENDOR_OF_APPLICATION, when->app_id, &vendor, 0) );
		if (vendor) {
			CHECK_FCT( dict_getval( vendor, &vendor_data) );
		}
		
		/* First search if we have to change the list, and how much */
		for (i = 0; i < g_conf->supported_apps_nb; i++) {
			if (g_conf->supported_apps_list[i].a < appl_data.application_id)
				continue;
			if (g_conf->supported_apps_list[i].a > appl_data.application_id)
				break;
			
			if (g_conf->supported_apps_list[i].t == APP_TYPE_AUTH)
				add_auth = 0;
			if (g_conf->supported_apps_list[i].t == APP_TYPE_ACCT)
				add_acct = 0;
		}
		
		if (add_auth || add_acct) {

			CHECK_MALLOC( g_conf->supported_apps_list = (peer_appl_t *)realloc(g_conf->supported_apps_list, (g_conf->supported_apps_nb + add_auth + add_acct + 1) * sizeof(peer_appl_t)) );
			g_conf->supported_apps_nb += add_auth + add_acct;

			/* Set the last element to 0 */
			memset( &g_conf->supported_apps_list[g_conf->supported_apps_nb], 0, sizeof(peer_appl_t));

			/* Move elements forward from the end */
			i = g_conf->supported_apps_nb - (1 + add_auth + add_acct);
			while ((i >= 0) && (g_conf->supported_apps_list[i].a > appl_data.application_id)) {
				memcpy( &g_conf->supported_apps_list[i + add_auth + add_acct], &g_conf->supported_apps_list[i], sizeof(peer_appl_t));
				i--;
			}
			if (add_acct) {
				g_conf->supported_apps_list[i+ add_auth + add_acct].a = appl_data.application_id;
				g_conf->supported_apps_list[i+ add_auth + add_acct].v = vendor ? vendor_data.vendor_id : 0;
				g_conf->supported_apps_list[i+ add_auth + add_acct].t = APP_TYPE_ACCT;
				i--;
			}
			if (add_auth) {
				g_conf->supported_apps_list[i+ add_auth + add_acct].a = appl_data.application_id;
				g_conf->supported_apps_list[i+ add_auth + add_acct].v = vendor ? vendor_data.vendor_id : 0;
				g_conf->supported_apps_list[i+ add_auth + add_acct].t = APP_TYPE_AUTH;
			}
		}
	}
	
	return 0;
}

/* Remove a registered callback. */
int disp_unregister ( disp_cb_hdl_t * handle )
{
	TRACE_ENTRY("%p", handle);
	
	CHECK_PARAMS( CHECK_HDL(handle) );
	
	CHECK_POSIX( pthread_rwlock_wrlock(&disp_dictlock) );
	uti_list_unlink(&_H(handle)->all);
	uti_list_unlink(&_H(handle)->dict);
	CHECK_POSIX( pthread_rwlock_unlock(&disp_dictlock) );
	
	free(handle);
	
	return 0;
}

"Welcome to our mercurial repository"