comparison libfdcore/peers.c @ 1207:043b894b0511

Cleanups in failover situation to avoid deadlocks and corrupt messages ids. Tested OK now.
author Sebastien Decugis <sdecugis@freediameter.net>
date Fri, 14 Jun 2013 17:30:42 +0800
parents d2608e47db28
children 8f9684264fe0
comparison
equal deleted inserted replaced
1206:ef7c5e39badf 1207:043b894b0511
75 CHECK_POSIX( pthread_mutex_init(&p->p_state_mtx, NULL) ); 75 CHECK_POSIX( pthread_mutex_init(&p->p_state_mtx, NULL) );
76 76
77 fd_list_init(&p->p_actives, p); 77 fd_list_init(&p->p_actives, p);
78 fd_list_init(&p->p_expiry, p); 78 fd_list_init(&p->p_expiry, p);
79 CHECK_FCT( fd_fifo_new(&p->p_tosend, 5) ); 79 CHECK_FCT( fd_fifo_new(&p->p_tosend, 5) );
80 CHECK_FCT( fd_fifo_new(&p->p_tofailover, 0) );
80 p->p_hbh = lrand48(); 81 p->p_hbh = lrand48();
81 82
82 fd_list_init(&p->p_sr.srs, p); 83 fd_list_init(&p->p_sr.srs, p);
83 fd_list_init(&p->p_sr.exp, p); 84 fd_list_init(&p->p_sr.exp, p);
84 CHECK_POSIX( pthread_mutex_init(&p->p_sr.mtx, NULL) ); 85 CHECK_POSIX( pthread_mutex_init(&p->p_sr.mtx, NULL) );
230 struct fd_list * __li = ((struct fd_list *)(_l))->next; \ 231 struct fd_list * __li = ((struct fd_list *)(_l))->next; \
231 fd_list_unlink(__li); \ 232 fd_list_unlink(__li); \
232 free(__li); \ 233 free(__li); \
233 } 234 }
234 235
235 /* Empty the lists of p_tosend and p_sentreq messages */ 236 /* Empty the lists of p_tosend, p_failover, and p_sentreq messages */
236 void fd_peer_failover_msg(struct fd_peer * peer) 237 void fd_peer_failover_msg(struct fd_peer * peer)
237 { 238 {
238 struct msg *m; 239 struct msg *m;
239 TRACE_ENTRY("%p", peer); 240 TRACE_ENTRY("%p", peer);
240 CHECK_PARAMS_DO(CHECK_PEER(peer), return); 241 CHECK_PARAMS_DO(CHECK_PEER(peer), return);
255 /* fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Non-routable message freed during handover", fd_msg_pmdl_get(m)); */ 256 /* fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Non-routable message freed during handover", fd_msg_pmdl_get(m)); */
256 CHECK_FCT_DO(fd_msg_free(m), /* What can we do more? */) 257 CHECK_FCT_DO(fd_msg_free(m), /* What can we do more? */)
257 } 258 }
258 } 259 }
259 260
261 /* Requeue all messages in the "failover" queue */
262 while ( fd_fifo_tryget(peer->p_tofailover, &m) == 0 ) {
263 fd_hook_call(HOOK_MESSAGE_FAILOVER, m, peer, NULL, fd_msg_pmdl_get(m));
264 CHECK_FCT_DO(fd_fifo_post_noblock(fd_g_outgoing, (void *)&m),
265 {
266 /* fallback: destroy the message */
267 fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(m));
268 CHECK_FCT_DO(fd_msg_free(m), /* What can we do more? */)
269 } );
270 }
271
260 /* Requeue all routable sent requests */ 272 /* Requeue all routable sent requests */
261 fd_p_sr_failover(&peer->p_sr); 273 fd_p_sr_failover(&peer->p_sr);
262 274
263 /* Done */ 275 /* Done */
264 return; 276 return;
332 344
333 fd_list_unlink(&p->p_expiry); 345 fd_list_unlink(&p->p_expiry);
334 fd_list_unlink(&p->p_actives); 346 fd_list_unlink(&p->p_actives);
335 347
336 CHECK_FCT_DO( fd_fifo_del(&p->p_tosend), /* continue */ ); 348 CHECK_FCT_DO( fd_fifo_del(&p->p_tosend), /* continue */ );
349 CHECK_FCT_DO( fd_fifo_del(&p->p_tofailover), /* continue */ );
337 CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_state_mtx), /* continue */); 350 CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_state_mtx), /* continue */);
338 CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_sr.mtx), /* continue */); 351 CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_sr.mtx), /* continue */);
339 CHECK_POSIX_DO( pthread_cond_destroy(&p->p_sr.cnd), /* continue */); 352 CHECK_POSIX_DO( pthread_cond_destroy(&p->p_sr.cnd), /* continue */);
340 353
341 /* If the callback is still around... */ 354 /* If the callback is still around... */
"Welcome to our mercurial repository"