changeset 1014:908ffbb81f60

Added a second callback in fd_msg_send_timeout to handle more easily the timeout situation
author Sebastien Decugis <sdecugis@freediameter.net>
date Fri, 29 Mar 2013 17:30:59 +0800
parents 7b5c46505e09
children dc7d6d5f93d3
files extensions/dbg_interactive/messages.i include/freeDiameter/libfdcore.h include/freeDiameter/libfdproto.h libfdcore/fdcore-internal.h libfdcore/messages.c libfdcore/p_sr.c libfdcore/routing_dispatch.c libfdproto/messages.c
diffstat 8 files changed, 124 insertions(+), 54 deletions(-) [+]
line wrap: on
line diff
--- a/extensions/dbg_interactive/messages.i	Fri Mar 29 17:11:45 2013 +0800
+++ b/extensions/dbg_interactive/messages.i	Fri Mar 29 17:30:59 2013 +0800
@@ -40,6 +40,7 @@
 %{
 struct anscb_py_layer {
 	PyObject * cb;
+	PyObject * expcb;
 	PyObject * data;
 };
 
@@ -66,6 +67,7 @@
 
 		result = PyObject_CallFunction(l->cb, "(OO)", PyMsg, l->data);
 		Py_XDECREF(l->cb);
+		Py_XDECREF(l->expcb);
 		Py_XDECREF(l->data);
 		free(l);
 
@@ -85,6 +87,45 @@
 		/* in this case, just delete the message */
 		/* it actually happens automatically when we do nothing. */
 }
+
+static void expcb_python(void *cbdata, DiamId_t sentto, size_t senttolen, struct msg ** msg) {
+	/* The python callback is received in cbdata */
+	PyObject * result, *PyMsg;
+	struct anscb_py_layer * l = cbdata;
+	
+	if (!l) {
+		fd_log_debug("Internal error! Python callback disappeared...");
+		return;
+	}
+	
+	SWIG_PYTHON_THREAD_BEGIN_BLOCK;
+
+	if (!msg || !*msg) {
+		PyMsg = Py_None;
+	} else {
+		PyMsg = SWIG_NewPointerObj((void *)*msg,     SWIGTYPE_p_msg,     0 );
+	}
+
+	result = PyObject_CallFunction(l->expcb, "(Os#O)", PyMsg, sentto, senttolen, l->data);
+	Py_XDECREF(l->cb);
+	Py_XDECREF(l->expcb);
+	Py_XDECREF(l->data);
+	free(l);
+
+	/* The callback is supposed to return a message or NULL */
+	if (!SWIG_IsOK(SWIG_ConvertPtr(result, (void *)msg, SWIGTYPE_p_msg, SWIG_POINTER_DISOWN))) {
+		fd_log_debug("Error: Cannot convert the return value to message.");
+		*msg = NULL;
+	}
+
+	Py_XDECREF(result);
+
+	SWIG_PYTHON_THREAD_END_BLOCK;
+		
+}
+
+
+
 %}
 
 struct msg {
@@ -127,7 +168,7 @@
 	
 	/* SEND THE MESSAGE */
 	%delobject send; /* when this has been called, the msg must not be freed anymore */
-	void send(PyObject * PyCb = NULL, PyObject * data = NULL, unsigned int timeout = 0) {
+	void send(PyObject * PyCb = NULL, PyObject * data = NULL, PyObject * PyExpCb = NULL, unsigned int timeout = 0) {
 		int ret;
 		struct msg * m = $self;
 		struct anscb_py_layer * l = NULL;
@@ -141,6 +182,8 @@
 
 			Py_XINCREF(PyCb);
 			Py_XINCREF(data);
+			Py_XINCREF(PyExpCb);
+			l->expcb = PyExpCb;
 			l->cb = PyCb;
 			l->data = data;
 		}
@@ -149,7 +192,7 @@
 			struct timespec ts;
 			(void) clock_gettime(CLOCK_REALTIME, &ts);
 			ts.tv_sec += timeout;
-			ret = fd_msg_send_timeout(&m, anscb_python, l, &ts);
+			ret = fd_msg_send_timeout(&m, anscb_python, l, expcb_python, &ts);
 		} else {
 			ret = fd_msg_send(&m, PyCb ? anscb_python : NULL, l);
 		}
--- a/include/freeDiameter/libfdcore.h	Fri Mar 29 17:11:45 2013 +0800
+++ b/include/freeDiameter/libfdcore.h	Fri Mar 29 17:30:59 2013 +0800
@@ -462,22 +462,23 @@
  *
  * PARAMETERS:
  *  pmsg 	: Location of the message to be sent on the network (set to NULL on function return to avoid double deletion).
- *  anscb	: A callback to be called when answer is received, if msg is a request (optional for fd_msg_send)
- *  anscb_data	: opaque data to be passed back to the anscb when it is called.
+ *  anscb	: A callback to be called when corresponding answer is received, when sending a request (not used with answers)
+ *  anscb_data	: opaque data to be passed back to the anscb (or expirecb) when it is called.
+ *  expirecb    : (only for fd_msg_send_timeout) If the request did not get an answer before timeout, this callback is called.
  *  timeout     : (only for fd_msg_send_timeout) sets the absolute time until when to wait for an answer. Past this time,
- *                the anscb is called with the request as parameter and the answer will be discarded when received.
+ *                the expirecb is called with the request and the answer will be discarded if received later.
  *
  * DESCRIPTION: 
  *   Sends a message on the network. (actually simply queues it in a global queue, to be picked by a daemon's thread)
  * For requests, the end-to-end id must be set (see fd_msg_get_eteid / MSGFL_ALLOC_ETEID).
- * For answers, the message must be created with function fd_msg_new_answ.
+ * For answers, the message must be created with function fd_msg_new_answer_from_req.
  *
  * The routing module will handle sending to the correct peer, usually based on the Destination-Realm / Destination-Host AVP.
  *
  * If the msg is a request, there are two ways of receiving the answer:
  *  - either having registered a callback in the dispatch module (see fd_disp_register)
- *  - or provide a callback as parameter here. If such callback is provided, it is called before the dispatch callbacks.
- *    The prototype for this callback function is:
+ *  - or provide a anscb callback here. If such callback is provided, it is called before the dispatch callbacks.
+ *    The prototype for this anscb callback function is:
  *     void anscb(void * data, struct msg ** answer)
  *	where:
  *		data   : opaque data that was registered along with the callback.
@@ -488,12 +489,19 @@
  * 
  * If no callback is registered to handle an answer, the message is discarded and an error is logged.
  *
- *  fd_msg_send_timeout is similar to fd_msg_send, except that it takes an additional argument "timeout" and can be called
- * only with requests as parameters, and an anscb callback.
- * If the matching answer or error is received before the timeout date passes, everything occurs as with fd_msg_send. Otherwise,
- * the request is removed from the queue (meaning the matching answer will be discarded upon reception) and passed to the answcb 
- * function. This function can easily distinguish between timeout case and answer case by checking if the message received is 
- * a request. Upon return, if the *msg parameter is not NULL, it is freed (not passed to other callbacks).
+ *  fd_msg_send_timeout is similar to fd_msg_send, except that it takes two additional arguments "expirecb" and "timeout". 
+ * If the message parameter is an answer, there is no difference with fd_msg_send.
+ * Otherwise, if the corresponding answer (or error) is received before the timeout date elapses, everything occurs as with fd_msg_send. 
+ * Otherwise, the request is removed from the queue (meaning the matching answer will be discarded upon reception) and passed to the expirecb 
+ * function. Upon return, if the *msg parameter is not NULL, it is freed (not passed to other callbacks). 
+ * expirecb is called in a dedicated thread.
+ * 
+ *    The prototype for the expirecb callback function is:
+ *     void expirecb(void * data, struct peer_hdr * sentto, struct msg ** request)
+ *	where:
+ *		data   : opaque data that was registered along with the callback.
+ *              sentto : pointer to the peer to which the message was sent and no answer received within timeout.
+ *		request: location of the pointer to the request that was not answered.
  *
  * RETURN VALUE:
  *  0      	: The message has been queued for sending (sending may fail asynchronously).
@@ -501,7 +509,7 @@
  *  ...
  */
 int fd_msg_send ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data );
-int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, const struct timespec *timeout );
+int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout );
 
 /*
  * FUNCTION:	fd_msg_rescode_set
--- a/include/freeDiameter/libfdproto.h	Fri Mar 29 17:11:45 2013 +0800
+++ b/include/freeDiameter/libfdproto.h	Fri Mar 29 17:30:59 2013 +0800
@@ -2426,21 +2426,22 @@
  * FUNCTION:	fd_msg_anscb_associate, fd_msg_anscb_get
  *
  * PARAMETERS:
- *  msg		: the answer message
+ *  msg		: the request message
  *  anscb	: the callback to associate with the message
  *  data	: the data to pass to the callback
+ *  expirecb    : the expiration callback to associate with the message
  *  timeout     : (optional, use NULL if no timeout) a timeout associated with calling the cb.
  *
  * DESCRIPTION:
- *  Associate or retrieve a callback with an answer message.
+ *  Associate or retrieve callbacks with an message.
  * This is meant to be called from the daemon only.
  *
  * RETURN VALUE:
  *  0 	  : ok
  *  EINVAL: a parameter is invalid
  */
-int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void  * data, const struct timespec *timeout );
-int fd_msg_anscb_get      ( struct msg * msg, void (**anscb)(void *, struct msg **), void ** data );
+int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void  * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout );
+int fd_msg_anscb_get( struct msg * msg, void (**anscb)(void *, struct msg **), void (**expirecb)(void *, DiamId_t, size_t, struct msg **), void ** data );
 struct timespec *fd_msg_anscb_gettimeout( struct msg * msg ); /* returns NULL or a valid non-0 timespec */
 
 /*
--- a/libfdcore/fdcore-internal.h	Fri Mar 29 17:11:45 2013 +0800
+++ b/libfdcore/fdcore-internal.h	Fri Mar 29 17:30:59 2013 +0800
@@ -133,7 +133,7 @@
 	long            cnt; /* number of requests in the srs list */
 	pthread_mutex_t	mtx; /* mutex to protect these lists */
 	pthread_cond_t  cnd; /* cond var used by the thread that handles timeouts */
-	pthread_t       thr; /* the thread that handles timeouts (and calls the anscb) */
+	pthread_t       thr; /* the thread that handles timeouts (expirecb called in separate forked threads) */
 };
 
 /* Peers */
--- a/libfdcore/messages.c	Fri Mar 29 17:11:45 2013 +0800
+++ b/libfdcore/messages.c	Fri Mar 29 17:30:59 2013 +0800
@@ -320,7 +320,7 @@
 	CHECK_PARAMS( pmsg );
 	
 	/* Save the callback in the message */
-	CHECK_FCT(  fd_msg_anscb_associate( *pmsg, anscb, data, NULL /* we should maybe use a safeguard here like 1 hour or so? */ )  );
+	CHECK_FCT(  fd_msg_anscb_associate( *pmsg, anscb, data, NULL, NULL /* we should maybe use a safeguard here like 1 hour or so? */ )  );
 	
 	/* Post the message in the outgoing queue */
 	CHECK_FCT( fd_fifo_post(fd_g_outgoing, pmsg) );
@@ -329,13 +329,13 @@
 }
 
 /* The variation of the same function with a timeout callback */
-int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, const struct timespec *timeout )
+int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout )
 {
-	TRACE_ENTRY("%p %p %p", pmsg, anscb, data, timeout);
-	CHECK_PARAMS( pmsg && anscb && timeout );
+	TRACE_ENTRY("%p %p %p %p %p", pmsg, anscb, data, expirecb, timeout);
+	CHECK_PARAMS( pmsg && expirecb && timeout );
 	
 	/* Save the callback in the message, with the timeout */
-	CHECK_FCT(  fd_msg_anscb_associate( *pmsg, anscb, data, timeout )  );
+	CHECK_FCT(  fd_msg_anscb_associate( *pmsg, anscb, data, expirecb, timeout )  );
 	
 	/* Post the message in the outgoing queue */
 	CHECK_FCT( fd_fifo_post(fd_g_outgoing, pmsg) );
--- a/libfdcore/p_sr.c	Fri Mar 29 17:11:45 2013 +0800
+++ b/libfdcore/p_sr.c	Fri Mar 29 17:30:59 2013 +0800
@@ -84,12 +84,17 @@
 	}
 }
 
+struct expire_data {
+	struct msg * request;
+	struct fd_peer * sentto;
+};
+
 /* (detached) thread that calls the anscb on expired messages. 
   We do it in a separate thread to avoid blocking the reception of new messages during this time */
-static void * call_anscb_expire(void * arg) {
-	struct msg * expired_req = arg;
+static void * call_expirecb(void * arg) {
+	struct expire_data * ed = arg;
 	
-	void (*anscb)(void *, struct msg **);
+	void (*expirecb)(void *, DiamId_t, size_t, struct msg **);
 	void * data;
 	
 	TRACE_ENTRY("%p", arg);
@@ -102,21 +107,23 @@
 	TRACE_DEBUG(INFO, "The expiration timer for a request has been reached, abording this attempt now & calling cb...");
 	
 	/* Retrieve callback in the message */
-	CHECK_FCT_DO( fd_msg_anscb_get( expired_req, &anscb, &data ), return NULL);
-	ASSERT(anscb);
+	CHECK_FCT_DO( fd_msg_anscb_get( ed->request, NULL, &expirecb, &data ), return NULL);
+	ASSERT(expirecb);
 	
 	/* Clean up this data from the message */
-	CHECK_FCT_DO( fd_msg_anscb_associate( expired_req, NULL, NULL, NULL ), return NULL);
+	CHECK_FCT_DO( fd_msg_anscb_associate( ed->request, NULL, NULL, NULL, NULL ), return NULL);
 
 	/* Call it */
-	(*anscb)(data, &expired_req);
+	(*expirecb)(data, ed->sentto->p_hdr.info.pi_diamid, ed->sentto->p_hdr.info.pi_diamidlen, &ed->request);
 	
 	/* If the callback did not dispose of the message, do it now */
-	if (expired_req) {
-		fd_msg_log(FD_MSG_LOG_DROPPED, expired_req, "Expiration period completed without an answer, and the expiry callback did not dispose of the message.");
-		CHECK_FCT_DO( fd_msg_free(expired_req), /* ignore */ );
+	if (ed->request) {
+		fd_msg_log(FD_MSG_LOG_DROPPED, ed->request, "Expiration period completed without an answer, and the expiry callback did not dispose of the message.");
+		CHECK_FCT_DO( fd_msg_free(ed->request), /* ignore */ );
 	}
 	
+	free(ed);
+	
 	/* Finish */
 	return NULL;
 }
@@ -124,8 +131,8 @@
 /* thread that handles messages expiring. The thread is started only when needed */
 static void * sr_expiry_th(void * arg) {
 	struct sr_list * srlist = arg;
-	struct msg * expired_req;
 	pthread_attr_t detached;
+	struct expire_data * ed;
 	
 	TRACE_ENTRY("%p", arg);
 	CHECK_PARAMS_DO( arg, return NULL );
@@ -176,12 +183,14 @@
 		}
 		
 		/* Now, the first request in the list is expired; remove it and call the anscb for it in a new thread */
+		CHECK_MALLOC_DO( ed = malloc(sizeof(struct expire_data)), goto error );
+		ed->sentto = first->chain.head->o;
+		ed->request = first->req;
 		fd_list_unlink(&first->chain);
 		fd_list_unlink(&first->expire);
-		expired_req = first->req;
 		free(first);
 		
-		CHECK_POSIX_DO( pthread_create( &th, &detached, call_anscb_expire, expired_req ), goto error );
+		CHECK_POSIX_DO( pthread_create( &th, &detached, call_expirecb, ed ), goto error );
 
 		/* loop */
 	} while (1);
--- a/libfdcore/routing_dispatch.c	Fri Mar 29 17:11:45 2013 +0800
+++ b/libfdcore/routing_dispatch.c	Fri Mar 29 17:30:59 2013 +0800
@@ -466,7 +466,7 @@
 		CHECK_FCT( fd_msg_answ_getq( msgptr, &qry ) );
 
 		/* Retrieve any registered handler */
-		CHECK_FCT( fd_msg_anscb_get( qry, &anscb, &data ) );
+		CHECK_FCT( fd_msg_anscb_get( qry, &anscb, NULL, &data ) );
 
 		/* If a callback was registered, pass the message to it */
 		if (anscb != NULL) {
@@ -792,6 +792,8 @@
 	struct avp * avp;
 	struct rtd_candidate * c;
 	struct msg *msgptr = msg;
+	DiamId_t qry_src = NULL;
+	size_t qry_src_len = 0;
 	
 	/* Read the message header */
 	CHECK_FCT( fd_msg_hdr(msgptr, &hdr) );
@@ -800,8 +802,6 @@
 	/* For answers, the routing is very easy */
 	if ( ! is_req ) {
 		struct msg * qry;
-		DiamId_t qry_src = NULL;
-		size_t qry_src_len = 0;
 		struct msg_hdr * qry_hdr;
 		struct fd_peer * peer = NULL;
 
@@ -831,6 +831,8 @@
 	}
 	
 	/* From that point, the message is a request */
+	CHECK_FCT( fd_msg_source_get( msgptr, &qry_src, &qry_src_len ) );
+	/* if qry_src != NULL, this message is relayed, otherwise it is locally issued */
 
 	/* Get the routing data out of the message if any (in case of re-transmit) */
 	CHECK_FCT( fd_msg_rt_get ( msgptr, &rtd ) );
--- a/libfdproto/messages.c	Fri Mar 29 17:11:45 2013 +0800
+++ b/libfdproto/messages.c	Fri Mar 29 17:30:59 2013 +0800
@@ -121,10 +121,11 @@
 	struct rt_data		*msg_rtdata;		/* Routing list for the query */
 	struct session		*msg_sess;		/* Cached message session if any */
 	struct {
-			void (*fct)(void *, struct msg **);
+			void (*anscb)(void *, struct msg **);
+			void (*expirecb)(void *, DiamId_t, size_t, struct msg **);
 			void * data;
 			struct timespec timeout;
-		}		 msg_cb;		/* Callback to be called when an answer is received, if not NULL */
+		}		 msg_cb;		/* Callback to be called when an answer is received, or timeout expires, if not NULL */
 	DiamId_t		 msg_src_id;		/* Diameter Id of the peer this message was received from. This string is malloc'd and must be freed */
 	size_t			 msg_src_id_len;	/* cached length of this string */
 	struct timespec		 msg_ts_rcv;		/* Timestamp when this message was received from the network */
@@ -774,8 +775,8 @@
 		msg->msg_public.msg_hbhid,
 		msg->msg_public.msg_eteid
 		) );
-	CHECK_FCT( dump_add_str(outstr, offset, outlen, INOBJHDR "intern: rwb:%p rt:%d cb:%p(%p) qry:%p asso:%d sess:%p src:%s(%zd)|", 
-			INOBJHDRVAL, msg->msg_rawbuffer, msg->msg_routable, msg->msg_cb.fct, msg->msg_cb.data, msg->msg_query, msg->msg_associated, msg->msg_sess, msg->msg_src_id?:"(nil)", msg->msg_src_id_len) );
+	CHECK_FCT( dump_add_str(outstr, offset, outlen, INOBJHDR "intern: rwb:%p rt:%d cb:%p,%p(%p) qry:%p asso:%d sess:%p src:%s(%zd)|", 
+			INOBJHDRVAL, msg->msg_rawbuffer, msg->msg_routable, msg->msg_cb.anscb, msg->msg_cb.expirecb, msg->msg_cb.data, msg->msg_query, msg->msg_associated, msg->msg_sess, msg->msg_src_id?:"(nil)", msg->msg_src_id_len) );
 	return 0;
 }
 
@@ -1029,9 +1030,9 @@
 }
 
 /* Associate / get answer callbacks */
-int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void  * data, const struct timespec *timeout )
+int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void  * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout )
 {
-	TRACE_ENTRY("%p %p %p", msg, anscb, data);
+	TRACE_ENTRY("%p %p %p %p", msg, anscb, expirecb, data);
 	
 	/* Check the parameters */
 	CHECK_PARAMS( CHECK_MSG(msg) );
@@ -1039,10 +1040,12 @@
 	if (! (msg->msg_public.msg_flags & CMD_FLAG_REQUEST ))
 		return anscb ? EINVAL : 0; /* we associate with requests only */
 	
-	CHECK_PARAMS( (anscb == NULL) || (msg->msg_cb.fct == NULL) ); /* We are not overwritting a cb */
+	CHECK_PARAMS( (anscb == NULL)    || (msg->msg_cb.anscb == NULL) ); /* We are not overwritting a cb */
+	CHECK_PARAMS( (expirecb == NULL) || (msg->msg_cb.expirecb == NULL) ); /* We are not overwritting a cb */
 	
 	/* Associate callback and data with the message, if any */
-	msg->msg_cb.fct = anscb;
+	msg->msg_cb.anscb = anscb;
+	msg->msg_cb.expirecb = expirecb;
 	msg->msg_cb.data = data;
 	if (timeout) {
 		memcpy(&msg->msg_cb.timeout, timeout, sizeof(struct timespec));
@@ -1051,16 +1054,20 @@
 	return 0;
 }	
 
-int fd_msg_anscb_get( struct msg * msg, void (**anscb)(void *, struct msg **), void ** data )
+int fd_msg_anscb_get( struct msg * msg, void (**anscb)(void *, struct msg **), void (**expirecb)(void *, DiamId_t, size_t, struct msg **), void ** data )
 {
-	TRACE_ENTRY("%p %p %p", msg, anscb, data);
+	TRACE_ENTRY("%p %p %p %p", msg, anscb, expirecb, data);
 	
 	/* Check the parameters */
-	CHECK_PARAMS( CHECK_MSG(msg) && anscb && data );
+	CHECK_PARAMS( CHECK_MSG(msg) );
 	
 	/* Copy the result */
-	*anscb = msg->msg_cb.fct;
-	*data  = msg->msg_cb.data;
+	if (anscb)
+		*anscb = msg->msg_cb.anscb;
+	if (data)
+		*data  = msg->msg_cb.data;
+	if (expirecb)
+		*expirecb = msg->msg_cb.expirecb;
 	
 	return 0;
 }
"Welcome to our mercurial repository"