Navigation


Changeset 1207:043b894b0511 in freeDiameter for libfdcore/p_out.c


Ignore:
Timestamp:
Jun 14, 2013, 6:30:42 PM (11 years ago)
Author:
Sebastien Decugis <sdecugis@freediameter.net>
Branch:
default
Phase:
public
Message:

Cleanups in failover situation to avoid deadlocks and corrupt messages ids. Tested OK now.

File:
1 edited

Legend:

Unmodified
Added
Removed
  • libfdcore/p_out.c

    r1186 r1207  
    7575        fd_hook_call(HOOK_MESSAGE_SENT, cpy_for_logs_only, peer, NULL, fd_msg_pmdl_get(cpy_for_logs_only));
    7676       
     77        pthread_cleanup_push((void *)fd_msg_free, *msg /* might be NULL, no problem */);
     78       
    7779        /* Send the message */
    7880        CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), );
     81       
     82        pthread_cleanup_pop(0);
     83       
    7984out:
    8085        ;       
     
    9398}
    9499
    95 static void cleanup_requeue(void * arg)
    96 {
    97         struct msg *msg = arg;
    98         CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg),
    99                 {
    100                         fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "An error occurred while attempting to requeue this message during cancellation of the sending function", fd_msg_pmdl_get(msg));
    101                         CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */);
    102                 } );
    103 }
    104 
    105100/* The code of the "out" thread */
    106101static void * out_thr(void * arg)
    107102{
    108103        struct fd_peer * peer = arg;
     104        int stop = 0;
     105        struct msg * msg;
    109106        ASSERT( CHECK_PEER(peer) );
    110107       
     
    117114       
    118115        /* Loop until cancelation */
    119         while (1) {
    120                 struct msg * msg;
     116        while (!stop) {
    121117                int ret;
    122118               
    123119                /* Retrieve next message to send */
    124120                CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error );
    125                
    126                 /* Now if we are cancelled, we requeue this message */
    127                 pthread_cleanup_push(cleanup_requeue, msg);
    128121               
    129122                /* Send the message, log any error */
     
    136129                                        fd_msg_free(msg);
    137130                                }
     131                                stop = 1;
    138132                        } );
    139133                       
    140                 /* Loop */
    141                 pthread_cleanup_pop(0);
    142         }
    143        
     134        }
     135       
     136        /* If we're here it means there was an error on the socket. We need to continue to purge the fifo & until we are canceled */
     137        CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );
     138       
     139        /* Requeue all routable messages in the global "out" queue, until we are canceled once the PSM deals with the CNX_ERROR sent above */
     140        while ( fd_fifo_get(peer->p_tosend, &msg) == 0 ) {
     141                if (fd_msg_is_routable(msg)) {
     142                        CHECK_FCT_DO(fd_fifo_post_noblock(peer->p_tofailover, (void *)&msg),
     143                                {
     144                                        /* fallback: destroy the message */
     145                                        fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(msg));
     146                                        CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */)
     147                                } );
     148                } else {
     149                        /* Just free it */
     150                        /* fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Non-routable message freed during handover", fd_msg_pmdl_get(m)); */
     151                        CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */)
     152                }
     153        }
     154
    144155error:
    145156        /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */
Note: See TracChangeset for help on using the changeset viewer.