Navigation


Changeset 1014:908ffbb81f60 in freeDiameter


Ignore:
Timestamp:
Mar 29, 2013, 6:30:59 PM (11 years ago)
Author:
Sebastien Decugis <sdecugis@freediameter.net>
Branch:
default
Phase:
public
Message:

Added a second callback in fd_msg_send_timeout to handle more easily the timeout situation

Files:
8 edited

Legend:

Unmodified
Added
Removed
  • extensions/dbg_interactive/messages.i

    r1013 r1014  
    4141struct anscb_py_layer {
    4242        PyObject * cb;
     43        PyObject * expcb;
    4344        PyObject * data;
    4445};
     
    6768                result = PyObject_CallFunction(l->cb, "(OO)", PyMsg, l->data);
    6869                Py_XDECREF(l->cb);
     70                Py_XDECREF(l->expcb);
    6971                Py_XDECREF(l->data);
    7072                free(l);
     
    8688                /* it actually happens automatically when we do nothing. */
    8789}
     90
     91static void expcb_python(void *cbdata, DiamId_t sentto, size_t senttolen, struct msg ** msg) {
     92        /* The python callback is received in cbdata */
     93        PyObject * result, *PyMsg;
     94        struct anscb_py_layer * l = cbdata;
     95       
     96        if (!l) {
     97                fd_log_debug("Internal error! Python callback disappeared...");
     98                return;
     99        }
     100       
     101        SWIG_PYTHON_THREAD_BEGIN_BLOCK;
     102
     103        if (!msg || !*msg) {
     104                PyMsg = Py_None;
     105        } else {
     106                PyMsg = SWIG_NewPointerObj((void *)*msg,     SWIGTYPE_p_msg,     0 );
     107        }
     108
     109        result = PyObject_CallFunction(l->expcb, "(Os#O)", PyMsg, sentto, senttolen, l->data);
     110        Py_XDECREF(l->cb);
     111        Py_XDECREF(l->expcb);
     112        Py_XDECREF(l->data);
     113        free(l);
     114
     115        /* The callback is supposed to return a message or NULL */
     116        if (!SWIG_IsOK(SWIG_ConvertPtr(result, (void *)msg, SWIGTYPE_p_msg, SWIG_POINTER_DISOWN))) {
     117                fd_log_debug("Error: Cannot convert the return value to message.");
     118                *msg = NULL;
     119        }
     120
     121        Py_XDECREF(result);
     122
     123        SWIG_PYTHON_THREAD_END_BLOCK;
     124               
     125}
     126
     127
     128
    88129%}
    89130
     
    128169        /* SEND THE MESSAGE */
    129170        %delobject send; /* when this has been called, the msg must not be freed anymore */
    130         void send(PyObject * PyCb = NULL, PyObject * data = NULL, unsigned int timeout = 0) {
     171        void send(PyObject * PyCb = NULL, PyObject * data = NULL, PyObject * PyExpCb = NULL, unsigned int timeout = 0) {
    131172                int ret;
    132173                struct msg * m = $self;
     
    142183                        Py_XINCREF(PyCb);
    143184                        Py_XINCREF(data);
     185                        Py_XINCREF(PyExpCb);
     186                        l->expcb = PyExpCb;
    144187                        l->cb = PyCb;
    145188                        l->data = data;
     
    150193                        (void) clock_gettime(CLOCK_REALTIME, &ts);
    151194                        ts.tv_sec += timeout;
    152                         ret = fd_msg_send_timeout(&m, anscb_python, l, &ts);
     195                        ret = fd_msg_send_timeout(&m, anscb_python, l, expcb_python, &ts);
    153196                } else {
    154197                        ret = fd_msg_send(&m, PyCb ? anscb_python : NULL, l);
  • include/freeDiameter/libfdcore.h

    r1010 r1014  
    463463 * PARAMETERS:
    464464 *  pmsg        : Location of the message to be sent on the network (set to NULL on function return to avoid double deletion).
    465  *  anscb       : A callback to be called when answer is received, if msg is a request (optional for fd_msg_send)
    466  *  anscb_data  : opaque data to be passed back to the anscb when it is called.
     465 *  anscb       : A callback to be called when corresponding answer is received, when sending a request (not used with answers)
     466 *  anscb_data  : opaque data to be passed back to the anscb (or expirecb) when it is called.
     467 *  expirecb    : (only for fd_msg_send_timeout) If the request did not get an answer before timeout, this callback is called.
    467468 *  timeout     : (only for fd_msg_send_timeout) sets the absolute time until when to wait for an answer. Past this time,
    468  *                the anscb is called with the request as parameter and the answer will be discarded when received.
     469 *                the expirecb is called with the request and the answer will be discarded if received later.
    469470 *
    470471 * DESCRIPTION:
    471472 *   Sends a message on the network. (actually simply queues it in a global queue, to be picked by a daemon's thread)
    472473 * For requests, the end-to-end id must be set (see fd_msg_get_eteid / MSGFL_ALLOC_ETEID).
    473  * For answers, the message must be created with function fd_msg_new_answ.
     474 * For answers, the message must be created with function fd_msg_new_answer_from_req.
    474475 *
    475476 * The routing module will handle sending to the correct peer, usually based on the Destination-Realm / Destination-Host AVP.
     
    477478 * If the msg is a request, there are two ways of receiving the answer:
    478479 *  - either having registered a callback in the dispatch module (see fd_disp_register)
    479  *  - or provide a callback as parameter here. If such callback is provided, it is called before the dispatch callbacks.
    480  *    The prototype for this callback function is:
     480 *  - or provide a anscb callback here. If such callback is provided, it is called before the dispatch callbacks.
     481 *    The prototype for this anscb callback function is:
    481482 *     void anscb(void * data, struct msg ** answer)
    482483 *      where:
     
    489490 * If no callback is registered to handle an answer, the message is discarded and an error is logged.
    490491 *
    491  *  fd_msg_send_timeout is similar to fd_msg_send, except that it takes an additional argument "timeout" and can be called
    492  * only with requests as parameters, and an anscb callback.
    493  * If the matching answer or error is received before the timeout date passes, everything occurs as with fd_msg_send. Otherwise,
    494  * the request is removed from the queue (meaning the matching answer will be discarded upon reception) and passed to the answcb
    495  * function. This function can easily distinguish between timeout case and answer case by checking if the message received is
    496  * a request. Upon return, if the *msg parameter is not NULL, it is freed (not passed to other callbacks).
     492 *  fd_msg_send_timeout is similar to fd_msg_send, except that it takes two additional arguments "expirecb" and "timeout".
     493 * If the message parameter is an answer, there is no difference with fd_msg_send.
     494 * Otherwise, if the corresponding answer (or error) is received before the timeout date elapses, everything occurs as with fd_msg_send.
     495 * Otherwise, the request is removed from the queue (meaning the matching answer will be discarded upon reception) and passed to the expirecb
     496 * function. Upon return, if the *msg parameter is not NULL, it is freed (not passed to other callbacks).
     497 * expirecb is called in a dedicated thread.
     498 *
     499 *    The prototype for the expirecb callback function is:
     500 *     void expirecb(void * data, struct peer_hdr * sentto, struct msg ** request)
     501 *      where:
     502 *              data   : opaque data that was registered along with the callback.
     503 *              sentto : pointer to the peer to which the message was sent and no answer received within timeout.
     504 *              request: location of the pointer to the request that was not answered.
    497505 *
    498506 * RETURN VALUE:
     
    502510 */
    503511int fd_msg_send ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data );
    504 int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, const struct timespec *timeout );
     512int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout );
    505513
    506514/*
  • include/freeDiameter/libfdproto.h

    r1004 r1014  
    24272427 *
    24282428 * PARAMETERS:
    2429  *  msg         : the answer message
     2429 *  msg         : the request message
    24302430 *  anscb       : the callback to associate with the message
    24312431 *  data        : the data to pass to the callback
     2432 *  expirecb    : the expiration callback to associate with the message
    24322433 *  timeout     : (optional, use NULL if no timeout) a timeout associated with calling the cb.
    24332434 *
    24342435 * DESCRIPTION:
    2435  *  Associate or retrieve a callback with an answer message.
     2436 *  Associate or retrieve callbacks with an message.
    24362437 * This is meant to be called from the daemon only.
    24372438 *
     
    24402441 *  EINVAL: a parameter is invalid
    24412442 */
    2442 int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void  * data, const struct timespec *timeout );
    2443 int fd_msg_anscb_get      ( struct msg * msg, void (**anscb)(void *, struct msg **), void ** data );
     2443int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void  * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout );
     2444int fd_msg_anscb_get( struct msg * msg, void (**anscb)(void *, struct msg **), void (**expirecb)(void *, DiamId_t, size_t, struct msg **), void ** data );
    24442445struct timespec *fd_msg_anscb_gettimeout( struct msg * msg ); /* returns NULL or a valid non-0 timespec */
    24452446
  • libfdcore/fdcore-internal.h

    r1010 r1014  
    134134        pthread_mutex_t mtx; /* mutex to protect these lists */
    135135        pthread_cond_t  cnd; /* cond var used by the thread that handles timeouts */
    136         pthread_t       thr; /* the thread that handles timeouts (and calls the anscb) */
     136        pthread_t       thr; /* the thread that handles timeouts (expirecb called in separate forked threads) */
    137137};
    138138
  • libfdcore/messages.c

    r928 r1014  
    321321       
    322322        /* Save the callback in the message */
    323         CHECK_FCT(  fd_msg_anscb_associate( *pmsg, anscb, data, NULL /* we should maybe use a safeguard here like 1 hour or so? */ )  );
     323        CHECK_FCT(  fd_msg_anscb_associate( *pmsg, anscb, data, NULL, NULL /* we should maybe use a safeguard here like 1 hour or so? */ )  );
    324324       
    325325        /* Post the message in the outgoing queue */
     
    330330
    331331/* The variation of the same function with a timeout callback */
    332 int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, const struct timespec *timeout )
    333 {
    334         TRACE_ENTRY("%p %p %p", pmsg, anscb, data, timeout);
    335         CHECK_PARAMS( pmsg && anscb && timeout );
     332int fd_msg_send_timeout ( struct msg ** pmsg, void (*anscb)(void *, struct msg **), void * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout )
     333{
     334        TRACE_ENTRY("%p %p %p %p %p", pmsg, anscb, data, expirecb, timeout);
     335        CHECK_PARAMS( pmsg && expirecb && timeout );
    336336       
    337337        /* Save the callback in the message, with the timeout */
    338         CHECK_FCT(  fd_msg_anscb_associate( *pmsg, anscb, data, timeout )  );
     338        CHECK_FCT(  fd_msg_anscb_associate( *pmsg, anscb, data, expirecb, timeout )  );
    339339       
    340340        /* Post the message in the outgoing queue */
  • libfdcore/p_sr.c

    r974 r1014  
    8585}
    8686
     87struct expire_data {
     88        struct msg * request;
     89        struct fd_peer * sentto;
     90};
     91
    8792/* (detached) thread that calls the anscb on expired messages.
    8893  We do it in a separate thread to avoid blocking the reception of new messages during this time */
    89 static void * call_anscb_expire(void * arg) {
    90         struct msg * expired_req = arg;
    91        
    92         void (*anscb)(void *, struct msg **);
     94static void * call_expirecb(void * arg) {
     95        struct expire_data * ed = arg;
     96       
     97        void (*expirecb)(void *, DiamId_t, size_t, struct msg **);
    9398        void * data;
    9499       
     
    103108       
    104109        /* Retrieve callback in the message */
    105         CHECK_FCT_DO( fd_msg_anscb_get( expired_req, &anscb, &data ), return NULL);
    106         ASSERT(anscb);
     110        CHECK_FCT_DO( fd_msg_anscb_get( ed->request, NULL, &expirecb, &data ), return NULL);
     111        ASSERT(expirecb);
    107112       
    108113        /* Clean up this data from the message */
    109         CHECK_FCT_DO( fd_msg_anscb_associate( expired_req, NULL, NULL, NULL ), return NULL);
     114        CHECK_FCT_DO( fd_msg_anscb_associate( ed->request, NULL, NULL, NULL, NULL ), return NULL);
    110115
    111116        /* Call it */
    112         (*anscb)(data, &expired_req);
     117        (*expirecb)(data, ed->sentto->p_hdr.info.pi_diamid, ed->sentto->p_hdr.info.pi_diamidlen, &ed->request);
    113118       
    114119        /* If the callback did not dispose of the message, do it now */
    115         if (expired_req) {
    116                 fd_msg_log(FD_MSG_LOG_DROPPED, expired_req, "Expiration period completed without an answer, and the expiry callback did not dispose of the message.");
    117                 CHECK_FCT_DO( fd_msg_free(expired_req), /* ignore */ );
    118         }
     120        if (ed->request) {
     121                fd_msg_log(FD_MSG_LOG_DROPPED, ed->request, "Expiration period completed without an answer, and the expiry callback did not dispose of the message.");
     122                CHECK_FCT_DO( fd_msg_free(ed->request), /* ignore */ );
     123        }
     124       
     125        free(ed);
    119126       
    120127        /* Finish */
     
    125132static void * sr_expiry_th(void * arg) {
    126133        struct sr_list * srlist = arg;
    127         struct msg * expired_req;
    128134        pthread_attr_t detached;
     135        struct expire_data * ed;
    129136       
    130137        TRACE_ENTRY("%p", arg);
     
    177184               
    178185                /* Now, the first request in the list is expired; remove it and call the anscb for it in a new thread */
     186                CHECK_MALLOC_DO( ed = malloc(sizeof(struct expire_data)), goto error );
     187                ed->sentto = first->chain.head->o;
     188                ed->request = first->req;
    179189                fd_list_unlink(&first->chain);
    180190                fd_list_unlink(&first->expire);
    181                 expired_req = first->req;
    182191                free(first);
    183192               
    184                 CHECK_POSIX_DO( pthread_create( &th, &detached, call_anscb_expire, expired_req ), goto error );
     193                CHECK_POSIX_DO( pthread_create( &th, &detached, call_expirecb, ed ), goto error );
    185194
    186195                /* loop */
  • libfdcore/routing_dispatch.c

    r974 r1014  
    467467
    468468                /* Retrieve any registered handler */
    469                 CHECK_FCT( fd_msg_anscb_get( qry, &anscb, &data ) );
     469                CHECK_FCT( fd_msg_anscb_get( qry, &anscb, NULL, &data ) );
    470470
    471471                /* If a callback was registered, pass the message to it */
     
    793793        struct rtd_candidate * c;
    794794        struct msg *msgptr = msg;
     795        DiamId_t qry_src = NULL;
     796        size_t qry_src_len = 0;
    795797       
    796798        /* Read the message header */
     
    801803        if ( ! is_req ) {
    802804                struct msg * qry;
    803                 DiamId_t qry_src = NULL;
    804                 size_t qry_src_len = 0;
    805805                struct msg_hdr * qry_hdr;
    806806                struct fd_peer * peer = NULL;
     
    832832       
    833833        /* From that point, the message is a request */
     834        CHECK_FCT( fd_msg_source_get( msgptr, &qry_src, &qry_src_len ) );
     835        /* if qry_src != NULL, this message is relayed, otherwise it is locally issued */
    834836
    835837        /* Get the routing data out of the message if any (in case of re-transmit) */
  • libfdproto/messages.c

    r1009 r1014  
    122122        struct session          *msg_sess;              /* Cached message session if any */
    123123        struct {
    124                         void (*fct)(void *, struct msg **);
     124                        void (*anscb)(void *, struct msg **);
     125                        void (*expirecb)(void *, DiamId_t, size_t, struct msg **);
    125126                        void * data;
    126127                        struct timespec timeout;
    127                 }                msg_cb;                /* Callback to be called when an answer is received, if not NULL */
     128                }                msg_cb;                /* Callback to be called when an answer is received, or timeout expires, if not NULL */
    128129        DiamId_t                 msg_src_id;            /* Diameter Id of the peer this message was received from. This string is malloc'd and must be freed */
    129130        size_t                   msg_src_id_len;        /* cached length of this string */
     
    775776                msg->msg_public.msg_eteid
    776777                ) );
    777         CHECK_FCT( dump_add_str(outstr, offset, outlen, INOBJHDR "intern: rwb:%p rt:%d cb:%p(%p) qry:%p asso:%d sess:%p src:%s(%zd)|",
    778                         INOBJHDRVAL, msg->msg_rawbuffer, msg->msg_routable, msg->msg_cb.fct, msg->msg_cb.data, msg->msg_query, msg->msg_associated, msg->msg_sess, msg->msg_src_id?:"(nil)", msg->msg_src_id_len) );
     778        CHECK_FCT( dump_add_str(outstr, offset, outlen, INOBJHDR "intern: rwb:%p rt:%d cb:%p,%p(%p) qry:%p asso:%d sess:%p src:%s(%zd)|",
     779                        INOBJHDRVAL, msg->msg_rawbuffer, msg->msg_routable, msg->msg_cb.anscb, msg->msg_cb.expirecb, msg->msg_cb.data, msg->msg_query, msg->msg_associated, msg->msg_sess, msg->msg_src_id?:"(nil)", msg->msg_src_id_len) );
    779780        return 0;
    780781}
     
    10301031
    10311032/* Associate / get answer callbacks */
    1032 int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void  * data, const struct timespec *timeout )
    1033 {
    1034         TRACE_ENTRY("%p %p %p", msg, anscb, data);
     1033int fd_msg_anscb_associate( struct msg * msg, void ( *anscb)(void *, struct msg **), void  * data, void (*expirecb)(void *, DiamId_t, size_t, struct msg **), const struct timespec *timeout )
     1034{
     1035        TRACE_ENTRY("%p %p %p %p", msg, anscb, expirecb, data);
    10351036       
    10361037        /* Check the parameters */
     
    10401041                return anscb ? EINVAL : 0; /* we associate with requests only */
    10411042       
    1042         CHECK_PARAMS( (anscb == NULL) || (msg->msg_cb.fct == NULL) ); /* We are not overwritting a cb */
     1043        CHECK_PARAMS( (anscb == NULL)    || (msg->msg_cb.anscb == NULL) ); /* We are not overwritting a cb */
     1044        CHECK_PARAMS( (expirecb == NULL) || (msg->msg_cb.expirecb == NULL) ); /* We are not overwritting a cb */
    10431045       
    10441046        /* Associate callback and data with the message, if any */
    1045         msg->msg_cb.fct = anscb;
     1047        msg->msg_cb.anscb = anscb;
     1048        msg->msg_cb.expirecb = expirecb;
    10461049        msg->msg_cb.data = data;
    10471050        if (timeout) {
     
    10521055}       
    10531056
    1054 int fd_msg_anscb_get( struct msg * msg, void (**anscb)(void *, struct msg **), void ** data )
    1055 {
    1056         TRACE_ENTRY("%p %p %p", msg, anscb, data);
     1057int fd_msg_anscb_get( struct msg * msg, void (**anscb)(void *, struct msg **), void (**expirecb)(void *, DiamId_t, size_t, struct msg **), void ** data )
     1058{
     1059        TRACE_ENTRY("%p %p %p %p", msg, anscb, expirecb, data);
    10571060       
    10581061        /* Check the parameters */
    1059         CHECK_PARAMS( CHECK_MSG(msg) && anscb && data );
     1062        CHECK_PARAMS( CHECK_MSG(msg) );
    10601063       
    10611064        /* Copy the result */
    1062         *anscb = msg->msg_cb.fct;
    1063         *data  = msg->msg_cb.data;
     1065        if (anscb)
     1066                *anscb = msg->msg_cb.anscb;
     1067        if (data)
     1068                *data  = msg->msg_cb.data;
     1069        if (expirecb)
     1070                *expirecb = msg->msg_cb.expirecb;
    10641071       
    10651072        return 0;
Note: See TracChangeset for help on using the changeset viewer.