# HG changeset patch # User Sebastien Decugis # Date 1364549459 -28800 # Node ID 908ffbb81f60c18de477303f19fbba0e221f1d0d # Parent 7b5c46505e0947b6a0c953ac23100689076bda6c Added a second callback in fd_msg_send_timeout to handle more easily the timeout situation diff -r 7b5c46505e09 -r 908ffbb81f60 extensions/dbg_interactive/messages.i --- a/extensions/dbg_interactive/messages.i Fri Mar 29 17:11:45 2013 +0800 +++ b/extensions/dbg_interactive/messages.i Fri Mar 29 17:30:59 2013 +0800 @@ -40,6 +40,7 @@ %{ struct anscb_py_layer { PyObject * cb; + PyObject * expcb; PyObject * data; }; @@ -66,6 +67,7 @@ result = PyObject_CallFunction(l->cb, "(OO)", PyMsg, l->data); Py_XDECREF(l->cb); + Py_XDECREF(l->expcb); Py_XDECREF(l->data); free(l); @@ -85,6 +87,45 @@ /* in this case, just delete the message */ /* it actually happens automatically when we do nothing. */ } + +static void expcb_python(void *cbdata, DiamId_t sentto, size_t senttolen, struct msg ** msg) { + /* The python callback is received in cbdata */ + PyObject * result, *PyMsg; + struct anscb_py_layer * l = cbdata; + + if (!l) { + fd_log_debug("Internal error! Python callback disappeared..."); + return; + } + + SWIG_PYTHON_THREAD_BEGIN_BLOCK; + + if (!msg || !*msg) { + PyMsg = Py_None; + } else { + PyMsg = SWIG_NewPointerObj((void *)*msg, SWIGTYPE_p_msg, 0 ); + } + + result = PyObject_CallFunction(l->expcb, "(Os#O)", PyMsg, sentto, senttolen, l->data); + Py_XDECREF(l->cb); + Py_XDECREF(l->expcb); + Py_XDECREF(l->data); + free(l); + + /* The callback is supposed to return a message or NULL */ + if (!SWIG_IsOK(SWIG_ConvertPtr(result, (void *)msg, SWIGTYPE_p_msg, SWIG_POINTER_DISOWN))) { + fd_log_debug("Error: Cannot convert the return value to message."); + *msg = NULL; + } + + Py_XDECREF(result); + + SWIG_PYTHON_THREAD_END_BLOCK; + +} + + + %} struct msg { @@ -127,7 +168,7 @@ /* SEND THE MESSAGE */ %delobject send; /* when this has been called, the msg must not be freed anymore */ - void send(PyObject * PyCb = NULL, PyObject * data = NULL, unsigned int timeout = 0) { + void send(PyObject * PyCb = NULL, PyObject * data = NULL, PyObject * PyExpCb = NULL, unsigned int timeout = 0) { int ret; struct msg * m = $self; struct anscb_py_layer * l = NULL; @@ -141,6 +182,8 @@ Py_XINCREF(PyCb); Py_XINCREF(data); + Py_XINCREF(PyExpCb); + l->expcb = PyExpCb; l->cb = PyCb; l->data = data; } @@ -149,7 +192,7 @@ struct timespec ts; (void) clock_gettime(CLOCK_REALTIME, &ts); ts.tv_sec += timeout; - ret = fd_msg_send_timeout(&m, anscb_python, l, &ts); + ret = fd_msg_send_timeout(&m, anscb_python, l, expcb_python, &ts); } else { ret = fd_msg_send(&m, PyCb ? anscb_python : NULL, l); } diff -r 7b5c46505e09 -r 908ffbb81f60 include/freeDiameter/libfdcore.h --- a/include/freeDiameter/libfdcore.h Fri Mar 29 17:11:45 2013 +0800 +++ b/include/freeDiameter/libfdcore.h Fri Mar 29 17:30:59 2013 +0800 @@ -462,22 +462,23 @@ * * PARAMETERS: * pmsg : Location of the message to be sent on the network (set to NULL on function return to avoid double deletion). - * anscb : A callback to be called when answer is received, if msg is a request (optional for fd_msg_send) - * anscb_data : opaque data to be passed back to the anscb when it is called. + * anscb : A callback to be called when corresponding answer is received, when sending a request (not used with answers) + * anscb_data : opaque data to be passed back to the anscb (or expirecb) when it is called. + * expirecb : (only for fd_msg_send_timeout) If the request did not get an answer before timeout, this callback is called. * timeout : (only for fd_msg_send_timeout) sets the absolute time until when to wait for an answer. Past this time, - * the anscb is called with the request as parameter and the answer will be discarded when received. + * the expirecb is called with the request and the answer will be discarded if received later. * * DESCRIPTION: * Sends a message on the network. (actually simply queues it in a global queue, to be picked by a daemon's thread) * For requests, the end-to-end id must be set (see fd_msg_get_eteid / MSGFL_ALLOC_ETEID). - * For answers, the message must be created with function fd_msg_new_answ. + * For answers, the message must be created with function fd_msg_new_answer_from_req. * * The routing module will handle sending to the correct peer, usually based on the Destination-Realm / Destination-Host AVP. * * If the msg is a request, there are two ways of receiving the answer: * - either having registered a callback in the dispatch module (see fd_disp_register) - * - or provide a callback as parameter here. If such callback is provided, it is called before the dispatch callbacks. - * The prototype for this callback function is: + * - or provide a anscb callback here. If such callback is provided, it is called before the dispatch callbacks. + * The prototype for this anscb callback function is: * void anscb(void * data, struct msg ** answer) * where: * data : opaque data that was registered along with the callback. @@ -488,12 +489,19 @@ * * If no callback is registered to handle an answer, the message is discarded and an error is logged. * - * fd_msg_send_timeout is similar to fd_msg_send, except that it takes an additional argument "timeout" and can be called - * only with requests as parameters, and an anscb callback. - * If the matching answer or error is received before the timeout date passes, everything occurs as with fd_msg_send. Otherwise, - * the request is removed from the queue (meaning the matching answer will be discarded upon reception) and passed to the answcb - * function. This function can easily distinguish between timeout case and answer case by checking if the message received is - * a request. Upon return, if the *msg parameter is not NULL, it is freed (not passed to other callbacks). + * fd_msg_send_timeout is similar to fd_msg_send, except that it takes two additional arguments "expirecb" and "timeout". + * If the message parameter is an answer, there is no difference with fd_msg_send. + * Otherwise, if the corresponding answer (or error) is received before the timeout date elapses, everything occurs as with fd_msg_send. + * Otherwise, the request is removed from the queue (meaning the matching answer will be discarded upon reception) and passed to the expirecb + * function. Upon return, if the *msg parameter is not NULL, it is freed (not passed to other callbacks). + * expirecb is called in a dedicated thread. + * + * The prototype for the expirecb callback function is: + * void expirecb(void * data, struct peer_hdr * sentto, struct msg ** request) + * where: + * data : opaque data that was registered along with the callback. + * sentto : pointer to the peer to which the message was sent and no answer received within timeout. + * request: location of the pointer to the request that was not answered. * * RETURN VALUE: * 0 : The message has been queued for sending (sending may fail asynchronously). @@ -501,7 +509,7 @@ * ... */ int fd_msg_send ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data ); -int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, const struct timespec *timeout ); +int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout ); /* * FUNCTION: fd_msg_rescode_set diff -r 7b5c46505e09 -r 908ffbb81f60 include/freeDiameter/libfdproto.h --- a/include/freeDiameter/libfdproto.h Fri Mar 29 17:11:45 2013 +0800 +++ b/include/freeDiameter/libfdproto.h Fri Mar 29 17:30:59 2013 +0800 @@ -2426,21 +2426,22 @@ * FUNCTION: fd_msg_anscb_associate, fd_msg_anscb_get * * PARAMETERS: - * msg : the answer message + * msg : the request message * anscb : the callback to associate with the message * data : the data to pass to the callback + * expirecb : the expiration callback to associate with the message * timeout : (optional, use NULL if no timeout) a timeout associated with calling the cb. * * DESCRIPTION: - * Associate or retrieve a callback with an answer message. + * Associate or retrieve callbacks with an message. * This is meant to be called from the daemon only. * * RETURN VALUE: * 0 : ok * EINVAL: a parameter is invalid */ -int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void * data, const struct timespec *timeout ); -int fd_msg_anscb_get ( struct msg * msg, void (**anscb)(void *, struct msg **), void ** data ); +int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout ); +int fd_msg_anscb_get( struct msg * msg, void (**anscb)(void *, struct msg **), void (**expirecb)(void *, DiamId_t, size_t, struct msg **), void ** data ); struct timespec *fd_msg_anscb_gettimeout( struct msg * msg ); /* returns NULL or a valid non-0 timespec */ /* diff -r 7b5c46505e09 -r 908ffbb81f60 libfdcore/fdcore-internal.h --- a/libfdcore/fdcore-internal.h Fri Mar 29 17:11:45 2013 +0800 +++ b/libfdcore/fdcore-internal.h Fri Mar 29 17:30:59 2013 +0800 @@ -133,7 +133,7 @@ long cnt; /* number of requests in the srs list */ pthread_mutex_t mtx; /* mutex to protect these lists */ pthread_cond_t cnd; /* cond var used by the thread that handles timeouts */ - pthread_t thr; /* the thread that handles timeouts (and calls the anscb) */ + pthread_t thr; /* the thread that handles timeouts (expirecb called in separate forked threads) */ }; /* Peers */ diff -r 7b5c46505e09 -r 908ffbb81f60 libfdcore/messages.c --- a/libfdcore/messages.c Fri Mar 29 17:11:45 2013 +0800 +++ b/libfdcore/messages.c Fri Mar 29 17:30:59 2013 +0800 @@ -320,7 +320,7 @@ CHECK_PARAMS( pmsg ); /* Save the callback in the message */ - CHECK_FCT( fd_msg_anscb_associate( *pmsg, anscb, data, NULL /* we should maybe use a safeguard here like 1 hour or so? */ ) ); + CHECK_FCT( fd_msg_anscb_associate( *pmsg, anscb, data, NULL, NULL /* we should maybe use a safeguard here like 1 hour or so? */ ) ); /* Post the message in the outgoing queue */ CHECK_FCT( fd_fifo_post(fd_g_outgoing, pmsg) ); @@ -329,13 +329,13 @@ } /* The variation of the same function with a timeout callback */ -int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, const struct timespec *timeout ) +int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout ) { - TRACE_ENTRY("%p %p %p", pmsg, anscb, data, timeout); - CHECK_PARAMS( pmsg && anscb && timeout ); + TRACE_ENTRY("%p %p %p %p %p", pmsg, anscb, data, expirecb, timeout); + CHECK_PARAMS( pmsg && expirecb && timeout ); /* Save the callback in the message, with the timeout */ - CHECK_FCT( fd_msg_anscb_associate( *pmsg, anscb, data, timeout ) ); + CHECK_FCT( fd_msg_anscb_associate( *pmsg, anscb, data, expirecb, timeout ) ); /* Post the message in the outgoing queue */ CHECK_FCT( fd_fifo_post(fd_g_outgoing, pmsg) ); diff -r 7b5c46505e09 -r 908ffbb81f60 libfdcore/p_sr.c --- a/libfdcore/p_sr.c Fri Mar 29 17:11:45 2013 +0800 +++ b/libfdcore/p_sr.c Fri Mar 29 17:30:59 2013 +0800 @@ -84,12 +84,17 @@ } } +struct expire_data { + struct msg * request; + struct fd_peer * sentto; +}; + /* (detached) thread that calls the anscb on expired messages. We do it in a separate thread to avoid blocking the reception of new messages during this time */ -static void * call_anscb_expire(void * arg) { - struct msg * expired_req = arg; +static void * call_expirecb(void * arg) { + struct expire_data * ed = arg; - void (*anscb)(void *, struct msg **); + void (*expirecb)(void *, DiamId_t, size_t, struct msg **); void * data; TRACE_ENTRY("%p", arg); @@ -102,21 +107,23 @@ TRACE_DEBUG(INFO, "The expiration timer for a request has been reached, abording this attempt now & calling cb..."); /* Retrieve callback in the message */ - CHECK_FCT_DO( fd_msg_anscb_get( expired_req, &anscb, &data ), return NULL); - ASSERT(anscb); + CHECK_FCT_DO( fd_msg_anscb_get( ed->request, NULL, &expirecb, &data ), return NULL); + ASSERT(expirecb); /* Clean up this data from the message */ - CHECK_FCT_DO( fd_msg_anscb_associate( expired_req, NULL, NULL, NULL ), return NULL); + CHECK_FCT_DO( fd_msg_anscb_associate( ed->request, NULL, NULL, NULL, NULL ), return NULL); /* Call it */ - (*anscb)(data, &expired_req); + (*expirecb)(data, ed->sentto->p_hdr.info.pi_diamid, ed->sentto->p_hdr.info.pi_diamidlen, &ed->request); /* If the callback did not dispose of the message, do it now */ - if (expired_req) { - fd_msg_log(FD_MSG_LOG_DROPPED, expired_req, "Expiration period completed without an answer, and the expiry callback did not dispose of the message."); - CHECK_FCT_DO( fd_msg_free(expired_req), /* ignore */ ); + if (ed->request) { + fd_msg_log(FD_MSG_LOG_DROPPED, ed->request, "Expiration period completed without an answer, and the expiry callback did not dispose of the message."); + CHECK_FCT_DO( fd_msg_free(ed->request), /* ignore */ ); } + free(ed); + /* Finish */ return NULL; } @@ -124,8 +131,8 @@ /* thread that handles messages expiring. The thread is started only when needed */ static void * sr_expiry_th(void * arg) { struct sr_list * srlist = arg; - struct msg * expired_req; pthread_attr_t detached; + struct expire_data * ed; TRACE_ENTRY("%p", arg); CHECK_PARAMS_DO( arg, return NULL ); @@ -176,12 +183,14 @@ } /* Now, the first request in the list is expired; remove it and call the anscb for it in a new thread */ + CHECK_MALLOC_DO( ed = malloc(sizeof(struct expire_data)), goto error ); + ed->sentto = first->chain.head->o; + ed->request = first->req; fd_list_unlink(&first->chain); fd_list_unlink(&first->expire); - expired_req = first->req; free(first); - CHECK_POSIX_DO( pthread_create( &th, &detached, call_anscb_expire, expired_req ), goto error ); + CHECK_POSIX_DO( pthread_create( &th, &detached, call_expirecb, ed ), goto error ); /* loop */ } while (1); diff -r 7b5c46505e09 -r 908ffbb81f60 libfdcore/routing_dispatch.c --- a/libfdcore/routing_dispatch.c Fri Mar 29 17:11:45 2013 +0800 +++ b/libfdcore/routing_dispatch.c Fri Mar 29 17:30:59 2013 +0800 @@ -466,7 +466,7 @@ CHECK_FCT( fd_msg_answ_getq( msgptr, &qry ) ); /* Retrieve any registered handler */ - CHECK_FCT( fd_msg_anscb_get( qry, &anscb, &data ) ); + CHECK_FCT( fd_msg_anscb_get( qry, &anscb, NULL, &data ) ); /* If a callback was registered, pass the message to it */ if (anscb != NULL) { @@ -792,6 +792,8 @@ struct avp * avp; struct rtd_candidate * c; struct msg *msgptr = msg; + DiamId_t qry_src = NULL; + size_t qry_src_len = 0; /* Read the message header */ CHECK_FCT( fd_msg_hdr(msgptr, &hdr) ); @@ -800,8 +802,6 @@ /* For answers, the routing is very easy */ if ( ! is_req ) { struct msg * qry; - DiamId_t qry_src = NULL; - size_t qry_src_len = 0; struct msg_hdr * qry_hdr; struct fd_peer * peer = NULL; @@ -831,6 +831,8 @@ } /* From that point, the message is a request */ + CHECK_FCT( fd_msg_source_get( msgptr, &qry_src, &qry_src_len ) ); + /* if qry_src != NULL, this message is relayed, otherwise it is locally issued */ /* Get the routing data out of the message if any (in case of re-transmit) */ CHECK_FCT( fd_msg_rt_get ( msgptr, &rtd ) ); diff -r 7b5c46505e09 -r 908ffbb81f60 libfdproto/messages.c --- a/libfdproto/messages.c Fri Mar 29 17:11:45 2013 +0800 +++ b/libfdproto/messages.c Fri Mar 29 17:30:59 2013 +0800 @@ -121,10 +121,11 @@ struct rt_data *msg_rtdata; /* Routing list for the query */ struct session *msg_sess; /* Cached message session if any */ struct { - void (*fct)(void *, struct msg **); + void (*anscb)(void *, struct msg **); + void (*expirecb)(void *, DiamId_t, size_t, struct msg **); void * data; struct timespec timeout; - } msg_cb; /* Callback to be called when an answer is received, if not NULL */ + } msg_cb; /* Callback to be called when an answer is received, or timeout expires, if not NULL */ DiamId_t msg_src_id; /* Diameter Id of the peer this message was received from. This string is malloc'd and must be freed */ size_t msg_src_id_len; /* cached length of this string */ struct timespec msg_ts_rcv; /* Timestamp when this message was received from the network */ @@ -774,8 +775,8 @@ msg->msg_public.msg_hbhid, msg->msg_public.msg_eteid ) ); - CHECK_FCT( dump_add_str(outstr, offset, outlen, INOBJHDR "intern: rwb:%p rt:%d cb:%p(%p) qry:%p asso:%d sess:%p src:%s(%zd)|", - INOBJHDRVAL, msg->msg_rawbuffer, msg->msg_routable, msg->msg_cb.fct, msg->msg_cb.data, msg->msg_query, msg->msg_associated, msg->msg_sess, msg->msg_src_id?:"(nil)", msg->msg_src_id_len) ); + CHECK_FCT( dump_add_str(outstr, offset, outlen, INOBJHDR "intern: rwb:%p rt:%d cb:%p,%p(%p) qry:%p asso:%d sess:%p src:%s(%zd)|", + INOBJHDRVAL, msg->msg_rawbuffer, msg->msg_routable, msg->msg_cb.anscb, msg->msg_cb.expirecb, msg->msg_cb.data, msg->msg_query, msg->msg_associated, msg->msg_sess, msg->msg_src_id?:"(nil)", msg->msg_src_id_len) ); return 0; } @@ -1029,9 +1030,9 @@ } /* Associate / get answer callbacks */ -int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void * data, const struct timespec *timeout ) +int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout ) { - TRACE_ENTRY("%p %p %p", msg, anscb, data); + TRACE_ENTRY("%p %p %p %p", msg, anscb, expirecb, data); /* Check the parameters */ CHECK_PARAMS( CHECK_MSG(msg) ); @@ -1039,10 +1040,12 @@ if (! (msg->msg_public.msg_flags & CMD_FLAG_REQUEST )) return anscb ? EINVAL : 0; /* we associate with requests only */ - CHECK_PARAMS( (anscb == NULL) || (msg->msg_cb.fct == NULL) ); /* We are not overwritting a cb */ + CHECK_PARAMS( (anscb == NULL) || (msg->msg_cb.anscb == NULL) ); /* We are not overwritting a cb */ + CHECK_PARAMS( (expirecb == NULL) || (msg->msg_cb.expirecb == NULL) ); /* We are not overwritting a cb */ /* Associate callback and data with the message, if any */ - msg->msg_cb.fct = anscb; + msg->msg_cb.anscb = anscb; + msg->msg_cb.expirecb = expirecb; msg->msg_cb.data = data; if (timeout) { memcpy(&msg->msg_cb.timeout, timeout, sizeof(struct timespec)); @@ -1051,16 +1054,20 @@ return 0; } -int fd_msg_anscb_get( struct msg * msg, void (**anscb)(void *, struct msg **), void ** data ) +int fd_msg_anscb_get( struct msg * msg, void (**anscb)(void *, struct msg **), void (**expirecb)(void *, DiamId_t, size_t, struct msg **), void ** data ) { - TRACE_ENTRY("%p %p %p", msg, anscb, data); + TRACE_ENTRY("%p %p %p %p", msg, anscb, expirecb, data); /* Check the parameters */ - CHECK_PARAMS( CHECK_MSG(msg) && anscb && data ); + CHECK_PARAMS( CHECK_MSG(msg) ); /* Copy the result */ - *anscb = msg->msg_cb.fct; - *data = msg->msg_cb.data; + if (anscb) + *anscb = msg->msg_cb.anscb; + if (data) + *data = msg->msg_cb.data; + if (expirecb) + *expirecb = msg->msg_cb.expirecb; return 0; }