comparison libfdcore/p_out.c @ 1207:043b894b0511

Cleanups in failover situation to avoid deadlocks and corrupt messages ids. Tested OK now.
author Sebastien Decugis <sdecugis@freediameter.net>
date Fri, 14 Jun 2013 17:30:42 +0800
parents 56c36d1007b4
children 8f9684264fe0
comparison
equal deleted inserted replaced
1206:ef7c5e39badf 1207:043b894b0511
72 } 72 }
73 73
74 /* Log the message */ 74 /* Log the message */
75 fd_hook_call(HOOK_MESSAGE_SENT, cpy_for_logs_only, peer, NULL, fd_msg_pmdl_get(cpy_for_logs_only)); 75 fd_hook_call(HOOK_MESSAGE_SENT, cpy_for_logs_only, peer, NULL, fd_msg_pmdl_get(cpy_for_logs_only));
76 76
77 pthread_cleanup_push((void *)fd_msg_free, *msg /* might be NULL, no problem */);
78
77 /* Send the message */ 79 /* Send the message */
78 CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), ); 80 CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), );
81
82 pthread_cleanup_pop(0);
83
79 out: 84 out:
80 ; 85 ;
81 pthread_cleanup_pop(1); 86 pthread_cleanup_pop(1);
82 87
83 if (ret) 88 if (ret)
90 } 95 }
91 96
92 return 0; 97 return 0;
93 } 98 }
94 99
95 static void cleanup_requeue(void * arg)
96 {
97 struct msg *msg = arg;
98 CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg),
99 {
100 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "An error occurred while attempting to requeue this message during cancellation of the sending function", fd_msg_pmdl_get(msg));
101 CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */);
102 } );
103 }
104
105 /* The code of the "out" thread */ 100 /* The code of the "out" thread */
106 static void * out_thr(void * arg) 101 static void * out_thr(void * arg)
107 { 102 {
108 struct fd_peer * peer = arg; 103 struct fd_peer * peer = arg;
104 int stop = 0;
105 struct msg * msg;
109 ASSERT( CHECK_PEER(peer) ); 106 ASSERT( CHECK_PEER(peer) );
110 107
111 /* Set the thread name */ 108 /* Set the thread name */
112 { 109 {
113 char buf[48]; 110 char buf[48];
114 snprintf(buf, sizeof(buf), "OUT/%s", peer->p_hdr.info.pi_diamid); 111 snprintf(buf, sizeof(buf), "OUT/%s", peer->p_hdr.info.pi_diamid);
115 fd_log_threadname ( buf ); 112 fd_log_threadname ( buf );
116 } 113 }
117 114
118 /* Loop until cancelation */ 115 /* Loop until cancelation */
119 while (1) { 116 while (!stop) {
120 struct msg * msg;
121 int ret; 117 int ret;
122 118
123 /* Retrieve next message to send */ 119 /* Retrieve next message to send */
124 CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error ); 120 CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error );
125
126 /* Now if we are cancelled, we requeue this message */
127 pthread_cleanup_push(cleanup_requeue, msg);
128 121
129 /* Send the message, log any error */ 122 /* Send the message, log any error */
130 CHECK_FCT_DO( ret = do_send(&msg, peer->p_cnxctx, &peer->p_hbh, peer), 123 CHECK_FCT_DO( ret = do_send(&msg, peer->p_cnxctx, &peer->p_hbh, peer),
131 { 124 {
132 if (msg) { 125 if (msg) {
133 char buf[256]; 126 char buf[256];
134 snprintf(buf, sizeof(buf), "Error while sending this message: %s", strerror(ret)); 127 snprintf(buf, sizeof(buf), "Error while sending this message: %s", strerror(ret));
135 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg)); 128 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg));
136 fd_msg_free(msg); 129 fd_msg_free(msg);
137 } 130 }
131 stop = 1;
138 } ); 132 } );
139 133
140 /* Loop */ 134 }
141 pthread_cleanup_pop(0); 135
142 } 136 /* If we're here it means there was an error on the socket. We need to continue to purge the fifo & until we are canceled */
143 137 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );
138
139 /* Requeue all routable messages in the global "out" queue, until we are canceled once the PSM deals with the CNX_ERROR sent above */
140 while ( fd_fifo_get(peer->p_tosend, &msg) == 0 ) {
141 if (fd_msg_is_routable(msg)) {
142 CHECK_FCT_DO(fd_fifo_post_noblock(peer->p_tofailover, (void *)&msg),
143 {
144 /* fallback: destroy the message */
145 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(msg));
146 CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */)
147 } );
148 } else {
149 /* Just free it */
150 /* fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Non-routable message freed during handover", fd_msg_pmdl_get(m)); */
151 CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */)
152 }
153 }
154
144 error: 155 error:
145 /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */ 156 /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */
146 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ ); 157 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );
147 return NULL; 158 return NULL;
148 } 159 }
"Welcome to our mercurial repository"