Mercurial > hg > freeDiameter
comparison libfdcore/p_out.c @ 1207:043b894b0511
Cleanups in failover situation to avoid deadlocks and corrupt messages ids. Tested OK now.
author | Sebastien Decugis <sdecugis@freediameter.net> |
---|---|
date | Fri, 14 Jun 2013 17:30:42 +0800 |
parents | 56c36d1007b4 |
children | 8f9684264fe0 |
comparison
equal
deleted
inserted
replaced
1206:ef7c5e39badf | 1207:043b894b0511 |
---|---|
72 } | 72 } |
73 | 73 |
74 /* Log the message */ | 74 /* Log the message */ |
75 fd_hook_call(HOOK_MESSAGE_SENT, cpy_for_logs_only, peer, NULL, fd_msg_pmdl_get(cpy_for_logs_only)); | 75 fd_hook_call(HOOK_MESSAGE_SENT, cpy_for_logs_only, peer, NULL, fd_msg_pmdl_get(cpy_for_logs_only)); |
76 | 76 |
77 pthread_cleanup_push((void *)fd_msg_free, *msg /* might be NULL, no problem */); | |
78 | |
77 /* Send the message */ | 79 /* Send the message */ |
78 CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), ); | 80 CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), ); |
81 | |
82 pthread_cleanup_pop(0); | |
83 | |
79 out: | 84 out: |
80 ; | 85 ; |
81 pthread_cleanup_pop(1); | 86 pthread_cleanup_pop(1); |
82 | 87 |
83 if (ret) | 88 if (ret) |
90 } | 95 } |
91 | 96 |
92 return 0; | 97 return 0; |
93 } | 98 } |
94 | 99 |
95 static void cleanup_requeue(void * arg) | |
96 { | |
97 struct msg *msg = arg; | |
98 CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg), | |
99 { | |
100 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "An error occurred while attempting to requeue this message during cancellation of the sending function", fd_msg_pmdl_get(msg)); | |
101 CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */); | |
102 } ); | |
103 } | |
104 | |
105 /* The code of the "out" thread */ | 100 /* The code of the "out" thread */ |
106 static void * out_thr(void * arg) | 101 static void * out_thr(void * arg) |
107 { | 102 { |
108 struct fd_peer * peer = arg; | 103 struct fd_peer * peer = arg; |
104 int stop = 0; | |
105 struct msg * msg; | |
109 ASSERT( CHECK_PEER(peer) ); | 106 ASSERT( CHECK_PEER(peer) ); |
110 | 107 |
111 /* Set the thread name */ | 108 /* Set the thread name */ |
112 { | 109 { |
113 char buf[48]; | 110 char buf[48]; |
114 snprintf(buf, sizeof(buf), "OUT/%s", peer->p_hdr.info.pi_diamid); | 111 snprintf(buf, sizeof(buf), "OUT/%s", peer->p_hdr.info.pi_diamid); |
115 fd_log_threadname ( buf ); | 112 fd_log_threadname ( buf ); |
116 } | 113 } |
117 | 114 |
118 /* Loop until cancelation */ | 115 /* Loop until cancelation */ |
119 while (1) { | 116 while (!stop) { |
120 struct msg * msg; | |
121 int ret; | 117 int ret; |
122 | 118 |
123 /* Retrieve next message to send */ | 119 /* Retrieve next message to send */ |
124 CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error ); | 120 CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error ); |
125 | |
126 /* Now if we are cancelled, we requeue this message */ | |
127 pthread_cleanup_push(cleanup_requeue, msg); | |
128 | 121 |
129 /* Send the message, log any error */ | 122 /* Send the message, log any error */ |
130 CHECK_FCT_DO( ret = do_send(&msg, peer->p_cnxctx, &peer->p_hbh, peer), | 123 CHECK_FCT_DO( ret = do_send(&msg, peer->p_cnxctx, &peer->p_hbh, peer), |
131 { | 124 { |
132 if (msg) { | 125 if (msg) { |
133 char buf[256]; | 126 char buf[256]; |
134 snprintf(buf, sizeof(buf), "Error while sending this message: %s", strerror(ret)); | 127 snprintf(buf, sizeof(buf), "Error while sending this message: %s", strerror(ret)); |
135 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg)); | 128 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg)); |
136 fd_msg_free(msg); | 129 fd_msg_free(msg); |
137 } | 130 } |
131 stop = 1; | |
138 } ); | 132 } ); |
139 | 133 |
140 /* Loop */ | 134 } |
141 pthread_cleanup_pop(0); | 135 |
142 } | 136 /* If we're here it means there was an error on the socket. We need to continue to purge the fifo & until we are canceled */ |
143 | 137 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ ); |
138 | |
139 /* Requeue all routable messages in the global "out" queue, until we are canceled once the PSM deals with the CNX_ERROR sent above */ | |
140 while ( fd_fifo_get(peer->p_tosend, &msg) == 0 ) { | |
141 if (fd_msg_is_routable(msg)) { | |
142 CHECK_FCT_DO(fd_fifo_post_noblock(peer->p_tofailover, (void *)&msg), | |
143 { | |
144 /* fallback: destroy the message */ | |
145 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(msg)); | |
146 CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */) | |
147 } ); | |
148 } else { | |
149 /* Just free it */ | |
150 /* fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Non-routable message freed during handover", fd_msg_pmdl_get(m)); */ | |
151 CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */) | |
152 } | |
153 } | |
154 | |
144 error: | 155 error: |
145 /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */ | 156 /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */ |
146 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ ); | 157 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ ); |
147 return NULL; | 158 return NULL; |
148 } | 159 } |