Changeset 1014:908ffbb81f60 in freeDiameter
- Timestamp:
- Mar 29, 2013, 6:30:59 PM (11 years ago)
- Branch:
- default
- Phase:
- public
- Files:
-
- 8 edited
Legend:
- Unmodified
- Added
- Removed
-
extensions/dbg_interactive/messages.i
r1013 r1014 41 41 struct anscb_py_layer { 42 42 PyObject * cb; 43 PyObject * expcb; 43 44 PyObject * data; 44 45 }; … … 67 68 result = PyObject_CallFunction(l->cb, "(OO)", PyMsg, l->data); 68 69 Py_XDECREF(l->cb); 70 Py_XDECREF(l->expcb); 69 71 Py_XDECREF(l->data); 70 72 free(l); … … 86 88 /* it actually happens automatically when we do nothing. */ 87 89 } 90 91 static void expcb_python(void *cbdata, DiamId_t sentto, size_t senttolen, struct msg ** msg) { 92 /* The python callback is received in cbdata */ 93 PyObject * result, *PyMsg; 94 struct anscb_py_layer * l = cbdata; 95 96 if (!l) { 97 fd_log_debug("Internal error! Python callback disappeared..."); 98 return; 99 } 100 101 SWIG_PYTHON_THREAD_BEGIN_BLOCK; 102 103 if (!msg || !*msg) { 104 PyMsg = Py_None; 105 } else { 106 PyMsg = SWIG_NewPointerObj((void *)*msg, SWIGTYPE_p_msg, 0 ); 107 } 108 109 result = PyObject_CallFunction(l->expcb, "(Os#O)", PyMsg, sentto, senttolen, l->data); 110 Py_XDECREF(l->cb); 111 Py_XDECREF(l->expcb); 112 Py_XDECREF(l->data); 113 free(l); 114 115 /* The callback is supposed to return a message or NULL */ 116 if (!SWIG_IsOK(SWIG_ConvertPtr(result, (void *)msg, SWIGTYPE_p_msg, SWIG_POINTER_DISOWN))) { 117 fd_log_debug("Error: Cannot convert the return value to message."); 118 *msg = NULL; 119 } 120 121 Py_XDECREF(result); 122 123 SWIG_PYTHON_THREAD_END_BLOCK; 124 125 } 126 127 128 88 129 %} 89 130 … … 128 169 /* SEND THE MESSAGE */ 129 170 %delobject send; /* when this has been called, the msg must not be freed anymore */ 130 void send(PyObject * PyCb = NULL, PyObject * data = NULL, unsigned int timeout = 0) {171 void send(PyObject * PyCb = NULL, PyObject * data = NULL, PyObject * PyExpCb = NULL, unsigned int timeout = 0) { 131 172 int ret; 132 173 struct msg * m = $self; … … 142 183 Py_XINCREF(PyCb); 143 184 Py_XINCREF(data); 185 Py_XINCREF(PyExpCb); 186 l->expcb = PyExpCb; 144 187 l->cb = PyCb; 145 188 l->data = data; … … 150 193 (void) clock_gettime(CLOCK_REALTIME, &ts); 151 194 ts.tv_sec += timeout; 152 ret = fd_msg_send_timeout(&m, anscb_python, l, &ts);195 ret = fd_msg_send_timeout(&m, anscb_python, l, expcb_python, &ts); 153 196 } else { 154 197 ret = fd_msg_send(&m, PyCb ? anscb_python : NULL, l); -
include/freeDiameter/libfdcore.h
r1010 r1014 463 463 * PARAMETERS: 464 464 * pmsg : Location of the message to be sent on the network (set to NULL on function return to avoid double deletion). 465 * anscb : A callback to be called when answer is received, if msg is a request (optional for fd_msg_send) 466 * anscb_data : opaque data to be passed back to the anscb when it is called. 465 * anscb : A callback to be called when corresponding answer is received, when sending a request (not used with answers) 466 * anscb_data : opaque data to be passed back to the anscb (or expirecb) when it is called. 467 * expirecb : (only for fd_msg_send_timeout) If the request did not get an answer before timeout, this callback is called. 467 468 * timeout : (only for fd_msg_send_timeout) sets the absolute time until when to wait for an answer. Past this time, 468 * the anscb is called with the request as parameter and the answer will be discarded when received.469 * the expirecb is called with the request and the answer will be discarded if received later. 469 470 * 470 471 * DESCRIPTION: 471 472 * Sends a message on the network. (actually simply queues it in a global queue, to be picked by a daemon's thread) 472 473 * For requests, the end-to-end id must be set (see fd_msg_get_eteid / MSGFL_ALLOC_ETEID). 473 * For answers, the message must be created with function fd_msg_new_answ .474 * For answers, the message must be created with function fd_msg_new_answer_from_req. 474 475 * 475 476 * The routing module will handle sending to the correct peer, usually based on the Destination-Realm / Destination-Host AVP. … … 477 478 * If the msg is a request, there are two ways of receiving the answer: 478 479 * - either having registered a callback in the dispatch module (see fd_disp_register) 479 * - or provide a callback as parameterhere. If such callback is provided, it is called before the dispatch callbacks.480 * The prototype for this callback function is:480 * - or provide a anscb callback here. If such callback is provided, it is called before the dispatch callbacks. 481 * The prototype for this anscb callback function is: 481 482 * void anscb(void * data, struct msg ** answer) 482 483 * where: … … 489 490 * If no callback is registered to handle an answer, the message is discarded and an error is logged. 490 491 * 491 * fd_msg_send_timeout is similar to fd_msg_send, except that it takes an additional argument "timeout" and can be called 492 * only with requests as parameters, and an anscb callback. 493 * If the matching answer or error is received before the timeout date passes, everything occurs as with fd_msg_send. Otherwise, 494 * the request is removed from the queue (meaning the matching answer will be discarded upon reception) and passed to the answcb 495 * function. This function can easily distinguish between timeout case and answer case by checking if the message received is 496 * a request. Upon return, if the *msg parameter is not NULL, it is freed (not passed to other callbacks). 492 * fd_msg_send_timeout is similar to fd_msg_send, except that it takes two additional arguments "expirecb" and "timeout". 493 * If the message parameter is an answer, there is no difference with fd_msg_send. 494 * Otherwise, if the corresponding answer (or error) is received before the timeout date elapses, everything occurs as with fd_msg_send. 495 * Otherwise, the request is removed from the queue (meaning the matching answer will be discarded upon reception) and passed to the expirecb 496 * function. Upon return, if the *msg parameter is not NULL, it is freed (not passed to other callbacks). 497 * expirecb is called in a dedicated thread. 498 * 499 * The prototype for the expirecb callback function is: 500 * void expirecb(void * data, struct peer_hdr * sentto, struct msg ** request) 501 * where: 502 * data : opaque data that was registered along with the callback. 503 * sentto : pointer to the peer to which the message was sent and no answer received within timeout. 504 * request: location of the pointer to the request that was not answered. 497 505 * 498 506 * RETURN VALUE: … … 502 510 */ 503 511 int fd_msg_send ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data ); 504 int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, const struct timespec *timeout );512 int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout ); 505 513 506 514 /* -
include/freeDiameter/libfdproto.h
r1004 r1014 2427 2427 * 2428 2428 * PARAMETERS: 2429 * msg : the answermessage2429 * msg : the request message 2430 2430 * anscb : the callback to associate with the message 2431 2431 * data : the data to pass to the callback 2432 * expirecb : the expiration callback to associate with the message 2432 2433 * timeout : (optional, use NULL if no timeout) a timeout associated with calling the cb. 2433 2434 * 2434 2435 * DESCRIPTION: 2435 * Associate or retrieve a callback with an answermessage.2436 * Associate or retrieve callbacks with an message. 2436 2437 * This is meant to be called from the daemon only. 2437 2438 * … … 2440 2441 * EINVAL: a parameter is invalid 2441 2442 */ 2442 int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void * data, const struct timespec *timeout );2443 int fd_msg_anscb_get ( struct msg * msg, void (**anscb)(void *, struct msg **), void ** data );2443 int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout ); 2444 int fd_msg_anscb_get( struct msg * msg, void (**anscb)(void *, struct msg **), void (**expirecb)(void *, DiamId_t, size_t, struct msg **), void ** data ); 2444 2445 struct timespec *fd_msg_anscb_gettimeout( struct msg * msg ); /* returns NULL or a valid non-0 timespec */ 2445 2446 -
libfdcore/fdcore-internal.h
r1010 r1014 134 134 pthread_mutex_t mtx; /* mutex to protect these lists */ 135 135 pthread_cond_t cnd; /* cond var used by the thread that handles timeouts */ 136 pthread_t thr; /* the thread that handles timeouts ( and calls the anscb) */136 pthread_t thr; /* the thread that handles timeouts (expirecb called in separate forked threads) */ 137 137 }; 138 138 -
libfdcore/messages.c
r928 r1014 321 321 322 322 /* Save the callback in the message */ 323 CHECK_FCT( fd_msg_anscb_associate( *pmsg, anscb, data, NULL /* we should maybe use a safeguard here like 1 hour or so? */ ) );323 CHECK_FCT( fd_msg_anscb_associate( *pmsg, anscb, data, NULL, NULL /* we should maybe use a safeguard here like 1 hour or so? */ ) ); 324 324 325 325 /* Post the message in the outgoing queue */ … … 330 330 331 331 /* The variation of the same function with a timeout callback */ 332 int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, const struct timespec *timeout )333 { 334 TRACE_ENTRY("%p %p %p ", pmsg, anscb, data, timeout);335 CHECK_PARAMS( pmsg && anscb && timeout );332 int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout ) 333 { 334 TRACE_ENTRY("%p %p %p %p %p", pmsg, anscb, data, expirecb, timeout); 335 CHECK_PARAMS( pmsg && expirecb && timeout ); 336 336 337 337 /* Save the callback in the message, with the timeout */ 338 CHECK_FCT( fd_msg_anscb_associate( *pmsg, anscb, data, timeout ) );338 CHECK_FCT( fd_msg_anscb_associate( *pmsg, anscb, data, expirecb, timeout ) ); 339 339 340 340 /* Post the message in the outgoing queue */ -
libfdcore/p_sr.c
r974 r1014 85 85 } 86 86 87 struct expire_data { 88 struct msg * request; 89 struct fd_peer * sentto; 90 }; 91 87 92 /* (detached) thread that calls the anscb on expired messages. 88 93 We do it in a separate thread to avoid blocking the reception of new messages during this time */ 89 static void * call_ anscb_expire(void * arg) {90 struct msg * expired_req= arg;91 92 void (* anscb)(void *, struct msg **);94 static void * call_expirecb(void * arg) { 95 struct expire_data * ed = arg; 96 97 void (*expirecb)(void *, DiamId_t, size_t, struct msg **); 93 98 void * data; 94 99 … … 103 108 104 109 /* Retrieve callback in the message */ 105 CHECK_FCT_DO( fd_msg_anscb_get( e xpired_req, &anscb, &data ), return NULL);106 ASSERT( anscb);110 CHECK_FCT_DO( fd_msg_anscb_get( ed->request, NULL, &expirecb, &data ), return NULL); 111 ASSERT(expirecb); 107 112 108 113 /* Clean up this data from the message */ 109 CHECK_FCT_DO( fd_msg_anscb_associate( e xpired_req, NULL, NULL, NULL ), return NULL);114 CHECK_FCT_DO( fd_msg_anscb_associate( ed->request, NULL, NULL, NULL, NULL ), return NULL); 110 115 111 116 /* Call it */ 112 (* anscb)(data, &expired_req);117 (*expirecb)(data, ed->sentto->p_hdr.info.pi_diamid, ed->sentto->p_hdr.info.pi_diamidlen, &ed->request); 113 118 114 119 /* If the callback did not dispose of the message, do it now */ 115 if (expired_req) { 116 fd_msg_log(FD_MSG_LOG_DROPPED, expired_req, "Expiration period completed without an answer, and the expiry callback did not dispose of the message."); 117 CHECK_FCT_DO( fd_msg_free(expired_req), /* ignore */ ); 118 } 120 if (ed->request) { 121 fd_msg_log(FD_MSG_LOG_DROPPED, ed->request, "Expiration period completed without an answer, and the expiry callback did not dispose of the message."); 122 CHECK_FCT_DO( fd_msg_free(ed->request), /* ignore */ ); 123 } 124 125 free(ed); 119 126 120 127 /* Finish */ … … 125 132 static void * sr_expiry_th(void * arg) { 126 133 struct sr_list * srlist = arg; 127 struct msg * expired_req;128 134 pthread_attr_t detached; 135 struct expire_data * ed; 129 136 130 137 TRACE_ENTRY("%p", arg); … … 177 184 178 185 /* Now, the first request in the list is expired; remove it and call the anscb for it in a new thread */ 186 CHECK_MALLOC_DO( ed = malloc(sizeof(struct expire_data)), goto error ); 187 ed->sentto = first->chain.head->o; 188 ed->request = first->req; 179 189 fd_list_unlink(&first->chain); 180 190 fd_list_unlink(&first->expire); 181 expired_req = first->req;182 191 free(first); 183 192 184 CHECK_POSIX_DO( pthread_create( &th, &detached, call_ anscb_expire, expired_req), goto error );193 CHECK_POSIX_DO( pthread_create( &th, &detached, call_expirecb, ed ), goto error ); 185 194 186 195 /* loop */ -
libfdcore/routing_dispatch.c
r974 r1014 467 467 468 468 /* Retrieve any registered handler */ 469 CHECK_FCT( fd_msg_anscb_get( qry, &anscb, &data ) );469 CHECK_FCT( fd_msg_anscb_get( qry, &anscb, NULL, &data ) ); 470 470 471 471 /* If a callback was registered, pass the message to it */ … … 793 793 struct rtd_candidate * c; 794 794 struct msg *msgptr = msg; 795 DiamId_t qry_src = NULL; 796 size_t qry_src_len = 0; 795 797 796 798 /* Read the message header */ … … 801 803 if ( ! is_req ) { 802 804 struct msg * qry; 803 DiamId_t qry_src = NULL;804 size_t qry_src_len = 0;805 805 struct msg_hdr * qry_hdr; 806 806 struct fd_peer * peer = NULL; … … 832 832 833 833 /* From that point, the message is a request */ 834 CHECK_FCT( fd_msg_source_get( msgptr, &qry_src, &qry_src_len ) ); 835 /* if qry_src != NULL, this message is relayed, otherwise it is locally issued */ 834 836 835 837 /* Get the routing data out of the message if any (in case of re-transmit) */ -
libfdproto/messages.c
r1009 r1014 122 122 struct session *msg_sess; /* Cached message session if any */ 123 123 struct { 124 void (*fct)(void *, struct msg **); 124 void (*anscb)(void *, struct msg **); 125 void (*expirecb)(void *, DiamId_t, size_t, struct msg **); 125 126 void * data; 126 127 struct timespec timeout; 127 } msg_cb; /* Callback to be called when an answer is received, if not NULL */128 } msg_cb; /* Callback to be called when an answer is received, or timeout expires, if not NULL */ 128 129 DiamId_t msg_src_id; /* Diameter Id of the peer this message was received from. This string is malloc'd and must be freed */ 129 130 size_t msg_src_id_len; /* cached length of this string */ … … 775 776 msg->msg_public.msg_eteid 776 777 ) ); 777 CHECK_FCT( dump_add_str(outstr, offset, outlen, INOBJHDR "intern: rwb:%p rt:%d cb:%p (%p) qry:%p asso:%d sess:%p src:%s(%zd)|",778 INOBJHDRVAL, msg->msg_rawbuffer, msg->msg_routable, msg->msg_cb. fct, msg->msg_cb.data, msg->msg_query, msg->msg_associated, msg->msg_sess, msg->msg_src_id?:"(nil)", msg->msg_src_id_len) );778 CHECK_FCT( dump_add_str(outstr, offset, outlen, INOBJHDR "intern: rwb:%p rt:%d cb:%p,%p(%p) qry:%p asso:%d sess:%p src:%s(%zd)|", 779 INOBJHDRVAL, msg->msg_rawbuffer, msg->msg_routable, msg->msg_cb.anscb, msg->msg_cb.expirecb, msg->msg_cb.data, msg->msg_query, msg->msg_associated, msg->msg_sess, msg->msg_src_id?:"(nil)", msg->msg_src_id_len) ); 779 780 return 0; 780 781 } … … 1030 1031 1031 1032 /* Associate / get answer callbacks */ 1032 int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void * data, const struct timespec *timeout )1033 { 1034 TRACE_ENTRY("%p %p %p ", msg, anscb, data);1033 int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout ) 1034 { 1035 TRACE_ENTRY("%p %p %p %p", msg, anscb, expirecb, data); 1035 1036 1036 1037 /* Check the parameters */ … … 1040 1041 return anscb ? EINVAL : 0; /* we associate with requests only */ 1041 1042 1042 CHECK_PARAMS( (anscb == NULL) || (msg->msg_cb.fct == NULL) ); /* We are not overwritting a cb */ 1043 CHECK_PARAMS( (anscb == NULL) || (msg->msg_cb.anscb == NULL) ); /* We are not overwritting a cb */ 1044 CHECK_PARAMS( (expirecb == NULL) || (msg->msg_cb.expirecb == NULL) ); /* We are not overwritting a cb */ 1043 1045 1044 1046 /* Associate callback and data with the message, if any */ 1045 msg->msg_cb.fct = anscb; 1047 msg->msg_cb.anscb = anscb; 1048 msg->msg_cb.expirecb = expirecb; 1046 1049 msg->msg_cb.data = data; 1047 1050 if (timeout) { … … 1052 1055 } 1053 1056 1054 int fd_msg_anscb_get( struct msg * msg, void (**anscb)(void *, struct msg **), void ** data )1055 { 1056 TRACE_ENTRY("%p %p %p ", msg, anscb, data);1057 int fd_msg_anscb_get( struct msg * msg, void (**anscb)(void *, struct msg **), void (**expirecb)(void *, DiamId_t, size_t, struct msg **), void ** data ) 1058 { 1059 TRACE_ENTRY("%p %p %p %p", msg, anscb, expirecb, data); 1057 1060 1058 1061 /* Check the parameters */ 1059 CHECK_PARAMS( CHECK_MSG(msg) && anscb && data);1062 CHECK_PARAMS( CHECK_MSG(msg) ); 1060 1063 1061 1064 /* Copy the result */ 1062 *anscb = msg->msg_cb.fct; 1063 *data = msg->msg_cb.data; 1065 if (anscb) 1066 *anscb = msg->msg_cb.anscb; 1067 if (data) 1068 *data = msg->msg_cb.data; 1069 if (expirecb) 1070 *expirecb = msg->msg_cb.expirecb; 1064 1071 1065 1072 return 0;
Note: See TracChangeset
for help on using the changeset viewer.