Changeset 1207:043b894b0511 in freeDiameter
- Timestamp:
- Jun 14, 2013, 6:30:42 PM (11 years ago)
- Branch:
- default
- Phase:
- public
- Location:
- libfdcore
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
libfdcore/fdcore-internal.h
r1186 r1207 184 184 /* Sent requests (for fallback), list of struct sentreq ordered by hbh */ 185 185 struct sr_list p_sr; 186 struct fifo *p_tofailover; 186 187 187 188 /* Pending received requests not yet answered (count only) */ -
libfdcore/p_out.c
r1186 r1207 75 75 fd_hook_call(HOOK_MESSAGE_SENT, cpy_for_logs_only, peer, NULL, fd_msg_pmdl_get(cpy_for_logs_only)); 76 76 77 pthread_cleanup_push((void *)fd_msg_free, *msg /* might be NULL, no problem */); 78 77 79 /* Send the message */ 78 80 CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), ); 81 82 pthread_cleanup_pop(0); 83 79 84 out: 80 85 ; … … 93 98 } 94 99 95 static void cleanup_requeue(void * arg)96 {97 struct msg *msg = arg;98 CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg),99 {100 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "An error occurred while attempting to requeue this message during cancellation of the sending function", fd_msg_pmdl_get(msg));101 CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */);102 } );103 }104 105 100 /* The code of the "out" thread */ 106 101 static void * out_thr(void * arg) 107 102 { 108 103 struct fd_peer * peer = arg; 104 int stop = 0; 105 struct msg * msg; 109 106 ASSERT( CHECK_PEER(peer) ); 110 107 … … 117 114 118 115 /* Loop until cancelation */ 119 while (1) { 120 struct msg * msg; 116 while (!stop) { 121 117 int ret; 122 118 123 119 /* Retrieve next message to send */ 124 120 CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error ); 125 126 /* Now if we are cancelled, we requeue this message */127 pthread_cleanup_push(cleanup_requeue, msg);128 121 129 122 /* Send the message, log any error */ … … 136 129 fd_msg_free(msg); 137 130 } 131 stop = 1; 138 132 } ); 139 133 140 /* Loop */ 141 pthread_cleanup_pop(0); 142 } 143 134 } 135 136 /* If we're here it means there was an error on the socket. We need to continue to purge the fifo & until we are canceled */ 137 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ ); 138 139 /* Requeue all routable messages in the global "out" queue, until we are canceled once the PSM deals with the CNX_ERROR sent above */ 140 while ( fd_fifo_get(peer->p_tosend, &msg) == 0 ) { 141 if (fd_msg_is_routable(msg)) { 142 CHECK_FCT_DO(fd_fifo_post_noblock(peer->p_tofailover, (void *)&msg), 143 { 144 /* fallback: destroy the message */ 145 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(msg)); 146 CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */) 147 } ); 148 } else { 149 /* Just free it */ 150 /* fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Non-routable message freed during handover", fd_msg_pmdl_get(m)); */ 151 CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */) 152 } 153 } 154 144 155 error: 145 156 /* It is not really a connection error, but the effect is the same, we are not able to send anymore message */ -
libfdcore/p_sr.c
r1188 r1207 38 38 /* Structure to store a sent request */ 39 39 struct sentreq { 40 struct fd_list chain; /* the "o" field points directly to the hop-by-hop of the request (uint32_t *) */40 struct fd_list chain; /* the "o" field points directly to the (new) hop-by-hop of the request (uint32_t *) */ 41 41 struct msg *req; /* A request that was sent and not yet answered. */ 42 uint32_t prevhbh;/* The value to set in the hbh header when the message is retrieved */42 uint32_t prevhbh;/* The value to set back in the hbh header when the message is retrieved */ 43 43 struct fd_list expire; /* the list of expiring requests */ 44 44 struct timespec added_on; /* the time the request was added */ … … 66 66 struct timespec now; 67 67 68 if (!TRACE_BOOL(ANNOYING)) 69 return; 70 71 fd_log_debug("%sSentReq list @%p:", text, srlist); 68 LOG_D("%sSentReq list @%p:", text, srlist); 72 69 73 70 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &now), ); … … 77 74 uint32_t * nexthbh = li->o; 78 75 79 fd_log_debug(" - Next req (hbh:%x): [since %ld.%06ld sec]", *nexthbh,76 LOG_D(" - Next req (hbh:0x%x, prev:0x%x): [since %ld.%06ld sec]", *nexthbh, sr->prevhbh, 80 77 (long)((now.tv_nsec >= sr->added_on.tv_nsec) ? (now.tv_sec - sr->added_on.tv_sec) : (now.tv_sec - sr->added_on.tv_sec - 1)), 81 78 (long)((now.tv_nsec >= sr->added_on.tv_nsec) ? ((now.tv_nsec - sr->added_on.tv_nsec) / 1000) : ((now.tv_nsec - sr->added_on.tv_nsec + 1000000000) / 1000))); … … 225 222 next = find_or_next(&srlist->srs, *hbhloc, &match); 226 223 if (match) { 227 TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id was already sent: error");224 TRACE_DEBUG(INFO, "A request with the same hop-by-hop Id (0x%x) was already sent: error", *hbhloc); 228 225 free(sr); 226 srl_dump("Current list of SR: ", &srlist->srs); 229 227 CHECK_POSIX_DO( pthread_mutex_unlock(&srlist->mtx), /* ignore */ ); 230 228 return EINVAL; … … 235 233 fd_list_insert_before(next, &sr->chain); 236 234 srlist->cnt++; 237 srl_dump("Saved new request, ", &srlist->srs);238 235 239 236 /* In case of request with a timeout, also store in the timeout list */ … … 280 277 /* Search the request in the list */ 281 278 CHECK_POSIX( pthread_mutex_lock(&srlist->mtx) ); 282 srl_dump("Fetching a request, ", &srlist->srs);283 279 sr = (struct sentreq *)find_or_next(&srlist->srs, hbh, &match); 284 280 if (!match) { 285 281 TRACE_DEBUG(INFO, "There is no saved request with this hop-by-hop id (%x)", hbh); 282 srl_dump("Current list of SR: ", &srlist->srs); 286 283 *req = NULL; 287 284 } else { -
libfdcore/peers.c
r1201 r1207 78 78 fd_list_init(&p->p_expiry, p); 79 79 CHECK_FCT( fd_fifo_new(&p->p_tosend, 5) ); 80 CHECK_FCT( fd_fifo_new(&p->p_tofailover, 0) ); 80 81 p->p_hbh = lrand48(); 81 82 … … 233 234 } 234 235 235 /* Empty the lists of p_tosend and p_sentreq messages */236 /* Empty the lists of p_tosend, p_failover, and p_sentreq messages */ 236 237 void fd_peer_failover_msg(struct fd_peer * peer) 237 238 { … … 258 259 } 259 260 261 /* Requeue all messages in the "failover" queue */ 262 while ( fd_fifo_tryget(peer->p_tofailover, &m) == 0 ) { 263 fd_hook_call(HOOK_MESSAGE_FAILOVER, m, peer, NULL, fd_msg_pmdl_get(m)); 264 CHECK_FCT_DO(fd_fifo_post_noblock(fd_g_outgoing, (void *)&m), 265 { 266 /* fallback: destroy the message */ 267 fd_hook_call(HOOK_MESSAGE_DROPPED, m, NULL, "Internal error: unable to requeue this message during failover process", fd_msg_pmdl_get(m)); 268 CHECK_FCT_DO(fd_msg_free(m), /* What can we do more? */) 269 } ); 270 } 271 260 272 /* Requeue all routable sent requests */ 261 273 fd_p_sr_failover(&peer->p_sr); … … 335 347 336 348 CHECK_FCT_DO( fd_fifo_del(&p->p_tosend), /* continue */ ); 349 CHECK_FCT_DO( fd_fifo_del(&p->p_tofailover), /* continue */ ); 337 350 CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_state_mtx), /* continue */); 338 351 CHECK_POSIX_DO( pthread_mutex_destroy(&p->p_sr.mtx), /* continue */);
Note: See TracChangeset
for help on using the changeset viewer.