comparison libfdcore/p_psm.c @ 1379:9f52a841b0c7

Fix typos in comments and remove trailing whitespace.
author Thomas Klausner <tk@giga.or.at>
date Thu, 20 Jun 2019 12:07:56 +0200
parents e7726fae1e7f
children 566bb46cc73f
comparison
equal deleted inserted replaced
1378:86e231b3d6fc 1379:9f52a841b0c7
38 /* 38 /*
39 This file implement a Peer State Machine which is a mix of: 39 This file implement a Peer State Machine which is a mix of:
40 - the state machine described in rfc3588bis 40 - the state machine described in rfc3588bis
41 - the state machine described in rfc3539#section-3.4 41 - the state machine described in rfc3539#section-3.4
42 - the following observations. 42 - the following observations.
43 43
44 The delivery of Diameter messages must not always be unordered: order is important at 44 The delivery of Diameter messages must not always be unordered: order is important at
45 begining and end of a connection lifetime. It means we need agility to 45 beginning and end of a connection lifetime. It means we need agility to
46 switch between "ordering enforced" and "ordering not enforced to counter 46 switch between "ordering enforced" and "ordering not enforced to counter
47 Head of the Line Blocking" modes of operation. 47 Head of the Line Blocking" modes of operation.
48 48
49 The connection state machine represented in RFC3588 (and RFC6733) is 49 The connection state machine represented in RFC3588 (and RFC6733) is
50 incomplete, because it lacks the SUSPECT state and the 3 DWR/DWA 50 incomplete, because it lacks the SUSPECT state and the 3 DWR/DWA
51 exchanges (section 5.1) when the peer recovers from this state. 51 exchanges (section 5.1) when the peer recovers from this state.
52 Personnally I don't see the rationale for exchanging 3 messages (why 3?) 52 Personally I don't see the rationale for exchanging 3 messages (why 3?)
53 but, if we require at least 1 DWR/DWA exchange to be always performed 53 but, if we require at least 1 DWR/DWA exchange to be always performed
54 after the CER/CEA exchange (and initiated by the peer that sent the 54 after the CER/CEA exchange (and initiated by the peer that sent the
55 CEA), we have a simple way to deal with our ordering problem, as resumed 55 CEA), we have a simple way to deal with our ordering problem, as resumed
56 below. Peers are: [i]nitiator, [r]esponder. 56 below. Peers are: [i]nitiator, [r]esponder.
57 (1) [i] SCTP connection attempt. 57 (1) [i] SCTP connection attempt.
94 very short, while application messages can be quite large. Therefore, 94 very short, while application messages can be quite large. Therefore,
95 they require much more time to deliver. 95 they require much more time to deliver.
96 96
97 I really cannot see a way to counter this effect by using the ordering 97 I really cannot see a way to counter this effect by using the ordering
98 of the messages, except by applying a timer (state STATE_CLOSING_GRACE). 98 of the messages, except by applying a timer (state STATE_CLOSING_GRACE).
99 This timer can be also useful when we detect that some messages has not 99 This timer can be also useful when we detect that some messages has not
100 yet received an answer on this link, to give time to the application to 100 yet received an answer on this link, to give time to the application to
101 complete the exchange ongoing. 101 complete the exchange ongoing.
102 102
103 However, this problem must be balanced with the fact that the message 103 However, this problem must be balanced with the fact that the message
104 that is lost will be in many cases sent again as the failover mechanism 104 that is lost will be in many cases sent again as the failover mechanism
105 specifies. 105 specifies.
125 static int fd_psm_waitstart() 125 static int fd_psm_waitstart()
126 { 126 {
127 int ret = 0; 127 int ret = 0;
128 TRACE_ENTRY(""); 128 TRACE_ENTRY("");
129 CHECK_POSIX( pthread_mutex_lock(&started_mtx) ); 129 CHECK_POSIX( pthread_mutex_lock(&started_mtx) );
130 awake: 130 awake:
131 if (!ret && !started) { 131 if (!ret && !started) {
132 pthread_cleanup_push( fd_cleanup_mutex, &started_mtx ); 132 pthread_cleanup_push( fd_cleanup_mutex, &started_mtx );
133 CHECK_POSIX_DO( ret = pthread_cond_wait(&started_cnd, &started_mtx), ); 133 CHECK_POSIX_DO( ret = pthread_cond_wait(&started_cnd, &started_mtx), );
134 pthread_cleanup_pop( 0 ); 134 pthread_cleanup_pop( 0 );
135 goto awake; 135 goto awake;
157 /* Enter/leave OPEN state */ 157 /* Enter/leave OPEN state */
158 static int enter_open_state(struct fd_peer * peer) 158 static int enter_open_state(struct fd_peer * peer)
159 { 159 {
160 struct fd_list * li; 160 struct fd_list * li;
161 CHECK_PARAMS( FD_IS_LIST_EMPTY(&peer->p_actives) ); 161 CHECK_PARAMS( FD_IS_LIST_EMPTY(&peer->p_actives) );
162 162
163 /* Callback registered by the credential validator (fd_peer_validate_register) */ 163 /* Callback registered by the credential validator (fd_peer_validate_register) */
164 if (peer->p_cb2) { 164 if (peer->p_cb2) {
165 CHECK_FCT_DO( (*peer->p_cb2)(&peer->p_hdr.info), 165 CHECK_FCT_DO( (*peer->p_cb2)(&peer->p_hdr.info),
166 { 166 {
167 TRACE_DEBUG(FULL, "Validation failed, terminating the connection"); 167 TRACE_DEBUG(FULL, "Validation failed, terminating the connection");
168 fd_psm_terminate(peer, "DO_NOT_WANT_TO_TALK_TO_YOU" ); 168 fd_psm_terminate(peer, "DO_NOT_WANT_TO_TALK_TO_YOU" );
169 } ); 169 } );
170 peer->p_cb2 = NULL; 170 peer->p_cb2 = NULL;
171 return 0; 171 return 0;
172 } 172 }
173 173
174 /* Insert in the active peers list */ 174 /* Insert in the active peers list */
175 CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) ); 175 CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) );
176 for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) { 176 for (li = fd_g_activ_peers.next; li != &fd_g_activ_peers; li = li->next) {
177 struct fd_peer * next_p = (struct fd_peer *)li->o; 177 struct fd_peer * next_p = (struct fd_peer *)li->o;
178 int cmp = fd_os_cmp(peer->p_hdr.info.pi_diamid, peer->p_hdr.info.pi_diamidlen, 178 int cmp = fd_os_cmp(peer->p_hdr.info.pi_diamid, peer->p_hdr.info.pi_diamidlen,
179 next_p->p_hdr.info.pi_diamid, next_p->p_hdr.info.pi_diamidlen); 179 next_p->p_hdr.info.pi_diamid, next_p->p_hdr.info.pi_diamidlen);
180 if (cmp < 0) 180 if (cmp < 0)
181 break; 181 break;
182 } 182 }
183 fd_list_insert_before(li, &peer->p_actives); 183 fd_list_insert_before(li, &peer->p_actives);
184 CHECK_POSIX( pthread_rwlock_unlock(&fd_g_activ_peers_rw) ); 184 CHECK_POSIX( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
185 185
186 /* Callback registered when the peer was added, by fd_peer_add */ 186 /* Callback registered when the peer was added, by fd_peer_add */
187 if (peer->p_cb) { 187 if (peer->p_cb) {
188 TRACE_DEBUG(FULL, "Calling add callback for peer %s", peer->p_hdr.info.pi_diamid); 188 TRACE_DEBUG(FULL, "Calling add callback for peer %s", peer->p_hdr.info.pi_diamid);
189 (*peer->p_cb)(&peer->p_hdr.info, peer->p_cb_data); /* TODO: do this in a separate detached thread? */ 189 (*peer->p_cb)(&peer->p_hdr.info, peer->p_cb_data); /* TODO: do this in a separate detached thread? */
190 peer->p_cb = NULL; 190 peer->p_cb = NULL;
191 peer->p_cb_data = NULL; 191 peer->p_cb_data = NULL;
192 } 192 }
193 193
194 /* Start the thread to handle outgoing messages */ 194 /* Start the thread to handle outgoing messages */
195 CHECK_FCT( fd_out_start(peer) ); 195 CHECK_FCT( fd_out_start(peer) );
196 196
197 /* Update the expiry timer now */ 197 /* Update the expiry timer now */
198 CHECK_FCT( fd_p_expi_update(peer) ); 198 CHECK_FCT( fd_p_expi_update(peer) );
199 199
200 return 0; 200 return 0;
201 } 201 }
202 static int leave_open_state(struct fd_peer * peer, int skip_failover) 202 static int leave_open_state(struct fd_peer * peer, int skip_failover)
203 { 203 {
204 /* Remove from active peers list */ 204 /* Remove from active peers list */
205 CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) ); 205 CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_activ_peers_rw) );
206 fd_list_unlink( &peer->p_actives ); 206 fd_list_unlink( &peer->p_actives );
207 CHECK_POSIX( pthread_rwlock_unlock(&fd_g_activ_peers_rw) ); 207 CHECK_POSIX( pthread_rwlock_unlock(&fd_g_activ_peers_rw) );
208 208
209 /* Stop the "out" thread */ 209 /* Stop the "out" thread */
210 CHECK_FCT( fd_out_stop(peer) ); 210 CHECK_FCT( fd_out_stop(peer) );
211 211
212 /* Failover the messages */ 212 /* Failover the messages */
213 if (!skip_failover) { 213 if (!skip_failover) {
214 fd_peer_failover_msg(peer); 214 fd_peer_failover_msg(peer);
215 } 215 }
216 216
217 return 0; 217 return 0;
218 } 218 }
219 219
220 220
221 /************************************************************************/ 221 /************************************************************************/
231 switch (ev->code) { 231 switch (ev->code) {
232 case FDEVP_CNX_ESTABLISHED: { 232 case FDEVP_CNX_ESTABLISHED: {
233 fd_cnx_destroy(ev->data); 233 fd_cnx_destroy(ev->data);
234 } 234 }
235 break; 235 break;
236 236
237 case FDEVP_TERMINATE: 237 case FDEVP_TERMINATE:
238 /* Do not free the string since it is a constant */ 238 /* Do not free the string since it is a constant */
239 break; 239 break;
240 240
241 case FDEVP_CNX_INCOMING: { 241 case FDEVP_CNX_INCOMING: {
242 struct cnx_incoming * evd = ev->data; 242 struct cnx_incoming * evd = ev->data;
243 fd_hook_call(HOOK_MESSAGE_DROPPED, evd->cer, NULL, "Message discarded while cleaning peer state machine queue.", fd_msg_pmdl_get(evd->cer)); 243 fd_hook_call(HOOK_MESSAGE_DROPPED, evd->cer, NULL, "Message discarded while cleaning peer state machine queue.", fd_msg_pmdl_get(evd->cer));
244 CHECK_FCT_DO( fd_msg_free(evd->cer), /* continue */); 244 CHECK_FCT_DO( fd_msg_free(evd->cer), /* continue */);
245 fd_cnx_destroy(evd->cnx); 245 fd_cnx_destroy(evd->cnx);
253 253
254 /* Read state */ 254 /* Read state */
255 int fd_peer_get_state(struct peer_hdr *peer) 255 int fd_peer_get_state(struct peer_hdr *peer)
256 { 256 {
257 int ret; 257 int ret;
258 258
259 struct fd_peer * p = (struct fd_peer *)peer; 259 struct fd_peer * p = (struct fd_peer *)peer;
260 260
261 if (!CHECK_PEER(p)) 261 if (!CHECK_PEER(p))
262 return -1; 262 return -1;
263 263
264 CHECK_POSIX_DO( pthread_mutex_lock(&p->p_state_mtx), return -1 ); 264 CHECK_POSIX_DO( pthread_mutex_lock(&p->p_state_mtx), return -1 );
265 ret = p->p_state; 265 ret = p->p_state;
266 CHECK_POSIX_DO( pthread_mutex_unlock(&p->p_state_mtx), return -1 ); 266 CHECK_POSIX_DO( pthread_mutex_unlock(&p->p_state_mtx), return -1 );
267 267
268 return ret; 268 return ret;
269 } 269 }
270 270
271 271
272 /* Change state */ 272 /* Change state */
273 int fd_psm_change_state(struct fd_peer * peer, int new_state) 273 int fd_psm_change_state(struct fd_peer * peer, int new_state)
274 { 274 {
275 int old; 275 int old;
276 276
277 TRACE_ENTRY("%p %d(%s)", peer, new_state, STATE_STR(new_state)); 277 TRACE_ENTRY("%p %d(%s)", peer, new_state, STATE_STR(new_state));
278 CHECK_PARAMS( CHECK_PEER(peer) ); 278 CHECK_PARAMS( CHECK_PEER(peer) );
279 279
280 old = fd_peer_getstate(peer); 280 old = fd_peer_getstate(peer);
281 if (old == new_state) 281 if (old == new_state)
282 return 0; 282 return 0;
283 283
284 LOG(((old == STATE_OPEN) || (new_state == STATE_OPEN)) ? ((new_state == STATE_SUSPECT || new_state == STATE_CLOSED) ? FD_LOG_ERROR : FD_LOG_NOTICE ): FD_LOG_DEBUG, "'%s'\t-> '%s'\t'%s'", 284 LOG(((old == STATE_OPEN) || (new_state == STATE_OPEN)) ? ((new_state == STATE_SUSPECT || new_state == STATE_CLOSED) ? FD_LOG_ERROR : FD_LOG_NOTICE ): FD_LOG_DEBUG, "'%s'\t-> '%s'\t'%s'",
285 STATE_STR(old), 285 STATE_STR(old),
286 STATE_STR(new_state), 286 STATE_STR(new_state),
287 peer->p_hdr.info.pi_diamid); 287 peer->p_hdr.info.pi_diamid);
288 288
289 289
290 CHECK_POSIX( pthread_mutex_lock(&peer->p_state_mtx) ); 290 CHECK_POSIX( pthread_mutex_lock(&peer->p_state_mtx) );
291 peer->p_state = new_state; 291 peer->p_state = new_state;
292 CHECK_POSIX( pthread_mutex_unlock(&peer->p_state_mtx) ); 292 CHECK_POSIX( pthread_mutex_unlock(&peer->p_state_mtx) );
293 293
294 if (old == STATE_OPEN) { 294 if (old == STATE_OPEN) {
295 CHECK_FCT( leave_open_state(peer, new_state == STATE_CLOSING_GRACE) ); 295 CHECK_FCT( leave_open_state(peer, new_state == STATE_CLOSING_GRACE) );
296 } 296 }
297 if (old == STATE_CLOSING_GRACE) { 297 if (old == STATE_CLOSING_GRACE) {
298 fd_peer_failover_msg(peer); 298 fd_peer_failover_msg(peer);
299 } 299 }
300 300
301 if (new_state == STATE_OPEN) { 301 if (new_state == STATE_OPEN) {
302 CHECK_FCT( enter_open_state(peer) ); 302 CHECK_FCT( enter_open_state(peer) );
303 } 303 }
304 304
305 if (new_state == STATE_CLOSED) { 305 if (new_state == STATE_CLOSED) {
306 /* Purge event list */ 306 /* Purge event list */
307 fd_psm_events_free(peer); 307 fd_psm_events_free(peer);
308 308
309 /* Reset the counter of pending anwers to send */ 309 /* Reset the counter of pending answers to send */
310 peer->p_reqin_count = 0; 310 peer->p_reqin_count = 0;
311 311
312 /* If the peer is not persistant, we destroy it */ 312 /* If the peer is not persistent, we destroy it */
313 if (peer->p_hdr.info.config.pic_flags.persist == PI_PRST_NONE) { 313 if (peer->p_hdr.info.config.pic_flags.persist == PI_PRST_NONE) {
314 CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, 0, NULL) ); 314 CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, 0, NULL) );
315 } 315 }
316 } 316 }
317 317
318 return 0; 318 return 0;
319 } 319 }
320 320
321 /* Set timeout timer of next event */ 321 /* Set timeout timer of next event */
322 void fd_psm_next_timeout(struct fd_peer * peer, int add_random, int delay) 322 void fd_psm_next_timeout(struct fd_peer * peer, int add_random, int delay)
323 { 323 {
324 TRACE_DEBUG(FULL, "Peer timeout reset to %d seconds%s", delay, add_random ? " (+/- 2)" : "" ); 324 TRACE_DEBUG(FULL, "Peer timeout reset to %d seconds%s", delay, add_random ? " (+/- 2)" : "" );
325 325
326 /* Initialize the timer */ 326 /* Initialize the timer */
327 CHECK_POSIX_DO( clock_gettime( CLOCK_REALTIME, &peer->p_psm_timer ), ASSERT(0) ); 327 CHECK_POSIX_DO( clock_gettime( CLOCK_REALTIME, &peer->p_psm_timer ), ASSERT(0) );
328 328
329 if (add_random) { 329 if (add_random) {
330 if (delay > 2) 330 if (delay > 2)
331 delay -= 2; 331 delay -= 2;
332 else 332 else
333 delay = 0; 333 delay = 0;
338 if (peer->p_psm_timer.tv_nsec >= 1000000000L) { 338 if (peer->p_psm_timer.tv_nsec >= 1000000000L) {
339 peer->p_psm_timer.tv_nsec -= 1000000000L; 339 peer->p_psm_timer.tv_nsec -= 1000000000L;
340 peer->p_psm_timer.tv_sec ++; 340 peer->p_psm_timer.tv_sec ++;
341 } 341 }
342 } 342 }
343 343
344 peer->p_psm_timer.tv_sec += delay; 344 peer->p_psm_timer.tv_sec += delay;
345 345
346 #ifdef SLOW_PSM 346 #ifdef SLOW_PSM
347 /* temporary for debug */ 347 /* temporary for debug */
348 peer->p_psm_timer.tv_sec += 10; 348 peer->p_psm_timer.tv_sec += 10;
349 #endif 349 #endif
350 } 350 }
354 { 354 {
355 /* Move to CLOSED state: failover messages, stop OUT thread, unlink peer from active list */ 355 /* Move to CLOSED state: failover messages, stop OUT thread, unlink peer from active list */
356 if (fd_peer_getstate(peer) != STATE_ZOMBIE) { 356 if (fd_peer_getstate(peer) != STATE_ZOMBIE) {
357 CHECK_FCT_DO( fd_psm_change_state(peer, STATE_CLOSED), /* continue */ ); 357 CHECK_FCT_DO( fd_psm_change_state(peer, STATE_CLOSED), /* continue */ );
358 } 358 }
359 359
360 fd_p_cnx_abort(peer, terminate); 360 fd_p_cnx_abort(peer, terminate);
361 361
362 fd_p_ce_clear_cnx(peer, NULL); 362 fd_p_ce_clear_cnx(peer, NULL);
363 363
364 if (peer->p_receiver) { 364 if (peer->p_receiver) {
365 fd_cnx_destroy(peer->p_receiver); 365 fd_cnx_destroy(peer->p_receiver);
366 peer->p_receiver = NULL; 366 peer->p_receiver = NULL;
367 } 367 }
368 368
369 if (terminate) { 369 if (terminate) {
370 fd_psm_events_free(peer); 370 fd_psm_events_free(peer);
371 CHECK_FCT_DO( fd_fifo_del(&peer->p_events), /* continue */ ); 371 CHECK_FCT_DO( fd_fifo_del(&peer->p_events), /* continue */ );
372 } 372 }
373 373
374 } 374 }
375 375
376 376
377 /************************************************************************/ 377 /************************************************************************/
378 /* The PSM thread */ 378 /* The PSM thread */
379 /************************************************************************/ 379 /************************************************************************/
380 /* Cancelation cleanup : set ZOMBIE state in the peer */ 380 /* Cancellation cleanup : set ZOMBIE state in the peer */
381 void cleanup_setstate(void * arg) 381 void cleanup_setstate(void * arg)
382 { 382 {
383 struct fd_peer * peer = (struct fd_peer *)arg; 383 struct fd_peer * peer = (struct fd_peer *)arg;
384 CHECK_PARAMS_DO( CHECK_PEER(peer), return ); 384 CHECK_PARAMS_DO( CHECK_PEER(peer), return );
385 CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), ); 385 CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), );
386 peer->p_state = STATE_ZOMBIE; 386 peer->p_state = STATE_ZOMBIE;
387 CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), ); 387 CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), );
388 return; 388 return;
389 } 389 }
390 390
391 /* The state machine thread (controler) */ 391 /* The state machine thread (controller) */
392 static void * p_psm_th( void * arg ) 392 static void * p_psm_th( void * arg )
393 { 393 {
394 struct fd_peer * peer = (struct fd_peer *)arg; 394 struct fd_peer * peer = (struct fd_peer *)arg;
395 int created_started = started ? 1 : 0; 395 int created_started = started ? 1 : 0;
396 int event; 396 int event;
397 size_t ev_sz; 397 size_t ev_sz;
398 void * ev_data; 398 void * ev_data;
399 int cur_state; 399 int cur_state;
400 400
401 CHECK_PARAMS_DO( CHECK_PEER(peer), ASSERT(0) ); 401 CHECK_PARAMS_DO( CHECK_PEER(peer), ASSERT(0) );
402 402
403 pthread_cleanup_push( cleanup_setstate, arg ); 403 pthread_cleanup_push( cleanup_setstate, arg );
404 404
405 /* Set the thread name */ 405 /* Set the thread name */
406 { 406 {
407 char buf[48]; 407 char buf[48];
408 snprintf(buf, sizeof(buf), "PSM/%s", peer->p_hdr.info.pi_diamid); 408 snprintf(buf, sizeof(buf), "PSM/%s", peer->p_hdr.info.pi_diamid);
409 fd_log_threadname ( buf ); 409 fd_log_threadname ( buf );
410 } 410 }
411 411
412 /* The state machine starts in CLOSED state */ 412 /* The state machine starts in CLOSED state */
413 CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), goto psm_end ); 413 CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), goto psm_end );
414 peer->p_state = STATE_CLOSED; 414 peer->p_state = STATE_CLOSED;
415 CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), goto psm_end ); 415 CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), goto psm_end );
416 416
417 /* Wait that the PSM are authorized to start in the daemon */ 417 /* Wait that the PSM are authorized to start in the daemon */
418 CHECK_FCT_DO( fd_psm_waitstart(), goto psm_end ); 418 CHECK_FCT_DO( fd_psm_waitstart(), goto psm_end );
419 419
420 /* Initialize the timer */ 420 /* Initialize the timer */
421 if (peer->p_flags.pf_responder) { 421 if (peer->p_flags.pf_responder) {
422 fd_psm_next_timeout(peer, 0, INCNX_TIMEOUT); 422 fd_psm_next_timeout(peer, 0, INCNX_TIMEOUT);
423 } else { 423 } else {
424 fd_psm_next_timeout(peer, created_started, 0); 424 fd_psm_next_timeout(peer, created_started, 0);
425 } 425 }
426 426
427 psm_loop: 427 psm_loop:
428 /* Get next event */ 428 /* Get next event */
429 TRACE_DEBUG(FULL, "'%s' in state '%s' waiting for next event.", 429 TRACE_DEBUG(FULL, "'%s' in state '%s' waiting for next event.",
430 peer->p_hdr.info.pi_diamid, STATE_STR(fd_peer_getstate(peer))); 430 peer->p_hdr.info.pi_diamid, STATE_STR(fd_peer_getstate(peer)));
431 CHECK_FCT_DO( fd_event_timedget(peer->p_events, &peer->p_psm_timer, FDEVP_PSM_TIMEOUT, &event, &ev_sz, &ev_data), goto psm_end ); 431 CHECK_FCT_DO( fd_event_timedget(peer->p_events, &peer->p_psm_timer, FDEVP_PSM_TIMEOUT, &event, &ev_sz, &ev_data), goto psm_end );
432 432
433 cur_state = fd_peer_getstate(peer); 433 cur_state = fd_peer_getstate(peer);
434 if (cur_state == -1) 434 if (cur_state == -1)
435 goto psm_end; 435 goto psm_end;
436 436
437 TRACE_DEBUG(FULL, "'%s'\t<-- '%s'\t(%p,%zd)\t'%s'", 437 TRACE_DEBUG(FULL, "'%s'\t<-- '%s'\t(%p,%zd)\t'%s'",
438 STATE_STR(cur_state), 438 STATE_STR(cur_state),
439 fd_pev_str(event), ev_data, ev_sz, 439 fd_pev_str(event), ev_data, ev_sz,
440 peer->p_hdr.info.pi_diamid); 440 peer->p_hdr.info.pi_diamid);
441 441
460 case STATE_OPEN_NEW: 460 case STATE_OPEN_NEW:
461 case STATE_REOPEN: 461 case STATE_REOPEN:
462 /* We cannot just close the connection, we have to send a DPR first */ 462 /* We cannot just close the connection, we have to send a DPR first */
463 CHECK_FCT_DO( fd_p_dp_initiate(peer, ev_data), goto psm_end ); 463 CHECK_FCT_DO( fd_p_dp_initiate(peer, ev_data), goto psm_end );
464 goto psm_loop; 464 goto psm_loop;
465 465
466 /* 466 /*
467 case STATE_CLOSING: 467 case STATE_CLOSING:
468 case STATE_CLOSING_GRACE: 468 case STATE_CLOSING_GRACE:
469 case STATE_WAITCNXACK: 469 case STATE_WAITCNXACK:
470 case STATE_WAITCNXACK_ELEC: 470 case STATE_WAITCNXACK_ELEC:
471 case STATE_WAITCEA: 471 case STATE_WAITCEA:
475 default: 475 default:
476 /* In these cases, we just cleanup the peer object (if needed) and terminate */ 476 /* In these cases, we just cleanup the peer object (if needed) and terminate */
477 goto psm_end; 477 goto psm_end;
478 } 478 }
479 } 479 }
480 480
481 /* A message was received */ 481 /* A message was received */
482 if (event == FDEVP_CNX_MSG_RECV) { 482 if (event == FDEVP_CNX_MSG_RECV) {
483 struct msg * msg = NULL; 483 struct msg * msg = NULL;
484 struct msg_hdr * hdr; 484 struct msg_hdr * hdr;
485 struct fd_cnx_rcvdata rcv_data; 485 struct fd_cnx_rcvdata rcv_data;
486 struct fd_msg_pmdl * pmdl = NULL; 486 struct fd_msg_pmdl * pmdl = NULL;
487 487
488 rcv_data.buffer = ev_data; 488 rcv_data.buffer = ev_data;
489 rcv_data.length = ev_sz; 489 rcv_data.length = ev_sz;
490 pmdl = fd_msg_pmdl_get_inbuf(rcv_data.buffer, rcv_data.length); 490 pmdl = fd_msg_pmdl_get_inbuf(rcv_data.buffer, rcv_data.length);
491 491
492 /* Parse the received buffer */ 492 /* Parse the received buffer */
493 CHECK_FCT_DO( fd_msg_parse_buffer( (void *)&ev_data, ev_sz, &msg), 493 CHECK_FCT_DO( fd_msg_parse_buffer( (void *)&ev_data, ev_sz, &msg),
494 { 494 {
495 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, NULL, peer, &rcv_data, pmdl ); 495 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, NULL, peer, &rcv_data, pmdl );
496 free(ev_data); 496 free(ev_data);
497 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), goto psm_reset ); 497 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), goto psm_reset );
498 goto psm_loop; 498 goto psm_loop;
499 } ); 499 } );
500 500
501 fd_hook_associate(msg, pmdl); 501 fd_hook_associate(msg, pmdl);
502 CHECK_FCT_DO( fd_msg_source_set( msg, peer->p_hdr.info.pi_diamid, peer->p_hdr.info.pi_diamidlen), goto psm_end); 502 CHECK_FCT_DO( fd_msg_source_set( msg, peer->p_hdr.info.pi_diamid, peer->p_hdr.info.pi_diamidlen), goto psm_end);
503 503
504 /* If the current state does not allow receiving messages, just drop it */ 504 /* If the current state does not allow receiving messages, just drop it */
505 if (cur_state == STATE_CLOSED) { 505 if (cur_state == STATE_CLOSED) {
506 /* In such case, just discard the message */ 506 /* In such case, just discard the message */
507 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, "Message purged from queue, peer in CLOSED state", fd_msg_pmdl_get(msg)); 507 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, "Message purged from queue, peer in CLOSED state", fd_msg_pmdl_get(msg));
508 fd_msg_free(msg); 508 fd_msg_free(msg);
509 goto psm_loop; 509 goto psm_loop;
510 } 510 }
511 511
512 /* Extract the header */ 512 /* Extract the header */
513 CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto psm_end ); 513 CHECK_FCT_DO( fd_msg_hdr(msg, &hdr), goto psm_end );
514 514
515 /* If it is an answer, associate with the request or drop */ 515 /* If it is an answer, associate with the request or drop */
516 if (!(hdr->msg_flags & CMD_FLAG_REQUEST)) { 516 if (!(hdr->msg_flags & CMD_FLAG_REQUEST)) {
517 struct msg * req; 517 struct msg * req;
518 /* Search matching request (same hbhid) */ 518 /* Search matching request (same hbhid) */
519 CHECK_FCT_DO( fd_p_sr_fetch(&peer->p_sr, hdr->msg_hbhid, &req), goto psm_end ); 519 CHECK_FCT_DO( fd_p_sr_fetch(&peer->p_sr, hdr->msg_hbhid, &req), goto psm_end );
520 if (req == NULL) { 520 if (req == NULL) {
521 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, "Answer received with no corresponding sent request.", fd_msg_pmdl_get(msg)); 521 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, "Answer received with no corresponding sent request.", fd_msg_pmdl_get(msg));
522 fd_msg_free(msg); 522 fd_msg_free(msg);
523 goto psm_loop; 523 goto psm_loop;
524 } 524 }
525 525
526 /* Associate */ 526 /* Associate */
527 CHECK_FCT_DO( fd_msg_answ_associate( msg, req ), goto psm_end ); 527 CHECK_FCT_DO( fd_msg_answ_associate( msg, req ), goto psm_end );
528 528
529 } 529 }
530 530
531 /* Log incoming message */ 531 /* Log incoming message */
532 fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, peer, NULL, fd_msg_pmdl_get(msg)); 532 fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, peer, NULL, fd_msg_pmdl_get(msg));
533 533
534 if (cur_state == STATE_OPEN_NEW) { 534 if (cur_state == STATE_OPEN_NEW) {
535 /* OK, we have received something, so the connection is supposedly now in OPEN state at the remote site */ 535 /* OK, we have received something, so the connection is supposedly now in OPEN state at the remote site */
536 fd_psm_change_state(peer, STATE_OPEN ); 536 fd_psm_change_state(peer, STATE_OPEN );
537 } 537 }
538 538
539 /* Now handle non-link-local messages */ 539 /* Now handle non-link-local messages */
540 if (fd_msg_is_routable(msg)) { 540 if (fd_msg_is_routable(msg)) {
541 switch (cur_state) { 541 switch (cur_state) {
542 /* To maximize compatibility -- should not be a security issue here */ 542 /* To maximize compatibility -- should not be a security issue here */
543 case STATE_REOPEN: 543 case STATE_REOPEN:
560 /* Mark the incoming request so that we know we have pending answers for this peer */ 560 /* Mark the incoming request so that we know we have pending answers for this peer */
561 CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), goto psm_end ); 561 CHECK_POSIX_DO( pthread_mutex_lock(&peer->p_state_mtx), goto psm_end );
562 peer->p_reqin_count++; 562 peer->p_reqin_count++;
563 CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), goto psm_end ); 563 CHECK_POSIX_DO( pthread_mutex_unlock(&peer->p_state_mtx), goto psm_end );
564 } 564 }
565 565
566 /* Requeue to the global incoming queue */ 566 /* Requeue to the global incoming queue */
567 CHECK_FCT_DO(fd_fifo_post(fd_g_incoming, &msg), goto psm_end ); 567 CHECK_FCT_DO(fd_fifo_post(fd_g_incoming, &msg), goto psm_end );
568 568
569 /* Update the peer timer (only in OPEN state) */ 569 /* Update the peer timer (only in OPEN state) */
570 if ((cur_state == STATE_OPEN) && (!peer->p_flags.pf_dw_pending)) { 570 if ((cur_state == STATE_OPEN) && (!peer->p_flags.pf_dw_pending)) {
571 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_twtimer ?: fd_g_config->cnf_timer_tw); 571 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_twtimer ?: fd_g_config->cnf_timer_tw);
572 } 572 }
573 break; 573 break;
574 574
575 /* In other states, we discard the message, it is either old or invalid to send it for the remote peer */ 575 /* In other states, we discard the message, it is either old or invalid to send it for the remote peer */
576 case STATE_WAITCNXACK: 576 case STATE_WAITCNXACK:
577 case STATE_WAITCNXACK_ELEC: 577 case STATE_WAITCNXACK_ELEC:
578 case STATE_WAITCEA: 578 case STATE_WAITCEA:
579 case STATE_CLOSED: 579 case STATE_CLOSED:
585 fd_msg_free(msg); 585 fd_msg_free(msg);
586 } 586 }
587 } 587 }
588 goto psm_loop; 588 goto psm_loop;
589 } 589 }
590 590
591 /* Link-local message: They must be understood by our dictionary, otherwise we return an error */ 591 /* Link-local message: They must be understood by our dictionary, otherwise we return an error */
592 { 592 {
593 struct msg * error = NULL; 593 struct msg * error = NULL;
594 int ret = fd_msg_parse_or_error( &msg, &error ); 594 int ret = fd_msg_parse_or_error( &msg, &error );
595 if (ret != EBADMSG) { 595 if (ret != EBADMSG) {
596 CHECK_FCT_DO( ret, 596 CHECK_FCT_DO( ret,
597 { 597 {
598 char buf[256]; 598 char buf[256];
599 snprintf(buf, sizeof(buf), "%s: An unexpected error occurred while parsing a link-local message", peer->p_hdr.info.pi_diamid); 599 snprintf(buf, sizeof(buf), "%s: An unexpected error occurred while parsing a link-local message", peer->p_hdr.info.pi_diamid);
600 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, buf, fd_msg_pmdl_get(msg)); 600 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, peer, buf, fd_msg_pmdl_get(msg));
601 fd_msg_free(msg); 601 fd_msg_free(msg);
602 goto psm_end; 602 goto psm_end;
603 } ); 603 } );
604 } else { 604 } else {
605 if (msg == NULL) { 605 if (msg == NULL) {
606 /* Send the error back to the peer */ 606 /* Send the error back to the peer */
607 CHECK_FCT_DO( ret = fd_out_send(&error, NULL, peer, 0), ); 607 CHECK_FCT_DO( ret = fd_out_send(&error, NULL, peer, 0), );
608 if (error) { 608 if (error) {
609 char buf[256]; 609 char buf[256];
610 /* Only if an error occurred & the message was not saved / dumped */ 610 /* Only if an error occurred & the message was not saved / dumped */
611 snprintf(buf, sizeof(buf), "%s: error sending a message", peer->p_hdr.info.pi_diamid); 611 snprintf(buf, sizeof(buf), "%s: error sending a message", peer->p_hdr.info.pi_diamid);
612 fd_hook_call(HOOK_MESSAGE_DROPPED, error, peer, buf, fd_msg_pmdl_get(error)); 612 fd_hook_call(HOOK_MESSAGE_DROPPED, error, peer, buf, fd_msg_pmdl_get(error));
613 CHECK_FCT_DO( fd_msg_free(error), goto psm_end); 613 CHECK_FCT_DO( fd_msg_free(error), goto psm_end);
614 } 614 }
615 } else { 615 } else {
616 char buf[256]; 616 char buf[256];
621 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), goto psm_reset ); 621 CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), goto psm_reset );
622 } 622 }
623 goto psm_loop; 623 goto psm_loop;
624 } 624 }
625 } 625 }
626 626
627 /* Handle the LL message and update the expiry timer appropriately */ 627 /* Handle the LL message and update the expiry timer appropriately */
628 switch (hdr->msg_code) { 628 switch (hdr->msg_code) {
629 case CC_CAPABILITIES_EXCHANGE: 629 case CC_CAPABILITIES_EXCHANGE:
630 CHECK_FCT_DO( fd_p_ce_msgrcv(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer), 630 CHECK_FCT_DO( fd_p_ce_msgrcv(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer),
631 { 631 {
632 if (msg) 632 if (msg)
633 CHECK_FCT_DO( fd_msg_free(msg), ); 633 CHECK_FCT_DO( fd_msg_free(msg), );
634 goto psm_reset; 634 goto psm_reset;
635 } ); 635 } );
636 break; 636 break;
637 637
638 case CC_DISCONNECT_PEER: 638 case CC_DISCONNECT_PEER:
639 CHECK_FCT_DO( fd_p_dp_handle(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer), goto psm_reset ); 639 CHECK_FCT_DO( fd_p_dp_handle(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer), goto psm_reset );
640 if (fd_peer_getstate(peer) == STATE_CLOSING) 640 if (fd_peer_getstate(peer) == STATE_CLOSING)
641 goto psm_end; 641 goto psm_end;
642 642
643 break; 643 break;
644 644
645 case CC_DEVICE_WATCHDOG: 645 case CC_DEVICE_WATCHDOG:
646 CHECK_FCT_DO( fd_p_dw_handle(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer), goto psm_reset ); 646 CHECK_FCT_DO( fd_p_dw_handle(&msg, (hdr->msg_flags & CMD_FLAG_REQUEST), peer), goto psm_reset );
647 break; 647 break;
648 648
649 default: 649 default:
650 /* Unknown / unexpected / invalid message -- but validated by our dictionary */ 650 /* Unknown / unexpected / invalid message -- but validated by our dictionary */
651 TRACE_DEBUG(INFO, "Invalid non-routable command received: %u.", hdr->msg_code); 651 TRACE_DEBUG(INFO, "Invalid non-routable command received: %u.", hdr->msg_code);
652 if (hdr->msg_flags & CMD_FLAG_REQUEST) { 652 if (hdr->msg_flags & CMD_FLAG_REQUEST) {
653 do { 653 do {
662 } while (0); 662 } while (0);
663 } else { 663 } else {
664 /* We did ASK for it ??? */ 664 /* We did ASK for it ??? */
665 TRACE_DEBUG(INFO, "Received answer with erroneous 'is_routable' result..."); 665 TRACE_DEBUG(INFO, "Received answer with erroneous 'is_routable' result...");
666 } 666 }
667 667
668 /* Cleanup the message if not done */ 668 /* Cleanup the message if not done */
669 if (msg) { 669 if (msg) {
670 char buf[256]; 670 char buf[256];
671 snprintf(buf, sizeof(buf), "Received un-handled non-routable command from peer '%s'.", peer->p_hdr.info.pi_diamid); 671 snprintf(buf, sizeof(buf), "Received un-handled non-routable command from peer '%s'.", peer->p_hdr.info.pi_diamid);
672 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg)); 672 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg));
673 CHECK_FCT_DO( fd_msg_free(msg), /* continue */); 673 CHECK_FCT_DO( fd_msg_free(msg), /* continue */);
674 msg = NULL; 674 msg = NULL;
675 } 675 }
676 }; 676 };
677 677
678 /* At this point the message must have been fully handled already */ 678 /* At this point the message must have been fully handled already */
679 if (msg) { 679 if (msg) {
680 char buf[256]; 680 char buf[256];
681 snprintf(buf, sizeof(buf), "Internal error ('%s'): unhandled message.", peer->p_hdr.info.pi_diamid); 681 snprintf(buf, sizeof(buf), "Internal error ('%s'): unhandled message.", peer->p_hdr.info.pi_diamid);
682 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg)); 682 fd_hook_call(HOOK_MESSAGE_DROPPED, msg, NULL, buf, fd_msg_pmdl_get(msg));
683 fd_msg_free(msg); 683 fd_msg_free(msg);
684 } 684 }
685 685
686 goto psm_loop; 686 goto psm_loop;
687 } 687 }
688 688
689 /* The connection object is broken */ 689 /* The connection object is broken */
690 if (event == FDEVP_CNX_ERROR) { 690 if (event == FDEVP_CNX_ERROR) {
691 switch (cur_state) { 691 switch (cur_state) {
692 case STATE_WAITCNXACK_ELEC: 692 case STATE_WAITCNXACK_ELEC:
693 /* Abort the initiating side */ 693 /* Abort the initiating side */
694 fd_p_cnx_abort(peer, 0); 694 fd_p_cnx_abort(peer, 0);
695 /* Process the receiver side */ 695 /* Process the receiver side */
696 CHECK_FCT_DO( fd_p_ce_process_receiver(peer), goto psm_end ); 696 CHECK_FCT_DO( fd_p_ce_process_receiver(peer), goto psm_end );
697 break; 697 break;
698 698
699 case STATE_WAITCEA: 699 case STATE_WAITCEA:
700 case STATE_OPEN: 700 case STATE_OPEN:
701 case STATE_OPEN_NEW: 701 case STATE_OPEN_NEW:
702 case STATE_REOPEN: 702 case STATE_REOPEN:
703 case STATE_WAITCNXACK: 703 case STATE_WAITCNXACK:
704 case STATE_SUSPECT: 704 case STATE_SUSPECT:
705 default: 705 default:
706 /* Mark the connection problem */ 706 /* Mark the connection problem */
707 peer->p_flags.pf_cnx_pb = 1; 707 peer->p_flags.pf_cnx_pb = 1;
708 708
709 fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, peer, "The connection was broken", NULL); 709 fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, peer, "The connection was broken", NULL);
710 710
711 /* Destroy the connection, restart the timer to a new connection attempt */ 711 /* Destroy the connection, restart the timer to a new connection attempt */
712 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc); 712 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc);
713 713
714 case STATE_CLOSED: 714 case STATE_CLOSED:
715 goto psm_reset; 715 goto psm_reset;
716 716
717 case STATE_CLOSING: 717 case STATE_CLOSING:
718 /* We sent a DPR so we are terminating, do not wait for DPA */ 718 /* We sent a DPR so we are terminating, do not wait for DPA */
719 goto psm_end; 719 goto psm_end;
720 720
721 case STATE_CLOSING_GRACE: 721 case STATE_CLOSING_GRACE:
722 if (peer->p_flags.pf_localterm) /* initiated here */ 722 if (peer->p_flags.pf_localterm) /* initiated here */
723 goto psm_end; 723 goto psm_end;
724 724
725 fd_psm_cleanup(peer, 0); 725 fd_psm_cleanup(peer, 0);
726 726
727 /* Reset the timer for next connection attempt */ 727 /* Reset the timer for next connection attempt */
728 fd_psm_next_timeout(peer, 1, fd_p_dp_newdelay(peer)); 728 fd_psm_next_timeout(peer, 1, fd_p_dp_newdelay(peer));
729 goto psm_loop; 729 goto psm_loop;
730 } 730 }
731 goto psm_loop; 731 goto psm_loop;
732 } 732 }
733 733
734 /* The connection notified a change in endpoints */ 734 /* The connection notified a change in endpoints */
735 if (event == FDEVP_CNX_EP_CHANGE) { 735 if (event == FDEVP_CNX_EP_CHANGE) {
736 /* We actually don't care if we are in OPEN state here... */ 736 /* We actually don't care if we are in OPEN state here... */
737 737
738 /* Cleanup the remote LL and primary addresses */ 738 /* Cleanup the remote LL and primary addresses */
739 CHECK_FCT_DO( fd_ep_filter( &peer->p_hdr.info.pi_endpoints, EP_FL_CONF | EP_FL_DISC | EP_FL_ADV ), /* ignore the error */); 739 CHECK_FCT_DO( fd_ep_filter( &peer->p_hdr.info.pi_endpoints, EP_FL_CONF | EP_FL_DISC | EP_FL_ADV ), /* ignore the error */);
740 CHECK_FCT_DO( fd_ep_clearflags( &peer->p_hdr.info.pi_endpoints, EP_FL_PRIMARY ), /* ignore the error */); 740 CHECK_FCT_DO( fd_ep_clearflags( &peer->p_hdr.info.pi_endpoints, EP_FL_PRIMARY ), /* ignore the error */);
741 741
742 /* Get the new ones */ 742 /* Get the new ones */
743 CHECK_FCT_DO( fd_cnx_getremoteeps(peer->p_cnxctx, &peer->p_hdr.info.pi_endpoints), /* ignore the error */); 743 CHECK_FCT_DO( fd_cnx_getremoteeps(peer->p_cnxctx, &peer->p_hdr.info.pi_endpoints), /* ignore the error */);
744 744
745 /* We do not support local endpoints change currently, but it could be added here if needed (refresh fd_g_config->cnf_endpoints) */ 745 /* We do not support local endpoints change currently, but it could be added here if needed (refresh fd_g_config->cnf_endpoints) */
746 { 746 {
747 char * buf = NULL; 747 char * buf = NULL;
748 size_t len = 0; 748 size_t len = 0;
749 LOG_D("Got low layer notification (IGNORED): remote endpoint(s) changed: %s", fd_ep_dump(&buf, &len, NULL, 0, 0, &peer->p_hdr.info.pi_endpoints) ?: "error"); 749 LOG_D("Got low layer notification (IGNORED): remote endpoint(s) changed: %s", fd_ep_dump(&buf, &len, NULL, 0, 0, &peer->p_hdr.info.pi_endpoints) ?: "error");
750 free(buf); 750 free(buf);
751 } 751 }
752 752
753 /* Done */ 753 /* Done */
754 goto psm_loop; 754 goto psm_loop;
755 } 755 }
756 756
757 /* A new connection was established and CER containing this peer id was received */ 757 /* A new connection was established and CER containing this peer id was received */
758 if (event == FDEVP_CNX_INCOMING) { 758 if (event == FDEVP_CNX_INCOMING) {
759 struct cnx_incoming * params = ev_data; 759 struct cnx_incoming * params = ev_data;
760 ASSERT(params); 760 ASSERT(params);
761 761
762 /* Handle the message */ 762 /* Handle the message */
763 CHECK_FCT_DO( fd_p_ce_handle_newCER(&params->cer, peer, &params->cnx, params->validate), goto psm_end ); 763 CHECK_FCT_DO( fd_p_ce_handle_newCER(&params->cer, peer, &params->cnx, params->validate), goto psm_end );
764 764
765 /* Cleanup if needed */ 765 /* Cleanup if needed */
766 if (params->cnx) { 766 if (params->cnx) {
767 fd_cnx_destroy(params->cnx); 767 fd_cnx_destroy(params->cnx);
768 params->cnx = NULL; 768 params->cnx = NULL;
769 } 769 }
770 if (params->cer) { 770 if (params->cer) {
771 CHECK_FCT_DO( fd_msg_free(params->cer), ); 771 CHECK_FCT_DO( fd_msg_free(params->cer), );
772 params->cer = NULL; 772 params->cer = NULL;
773 } 773 }
774 774
775 /* Loop */ 775 /* Loop */
776 free(ev_data); 776 free(ev_data);
777 goto psm_loop; 777 goto psm_loop;
778 } 778 }
779 779
780 /* A new connection has been established with the remote peer */ 780 /* A new connection has been established with the remote peer */
781 if (event == FDEVP_CNX_ESTABLISHED) { 781 if (event == FDEVP_CNX_ESTABLISHED) {
782 struct cnxctx * cnx = ev_data; 782 struct cnxctx * cnx = ev_data;
783 783
784 /* Release the resources of the connecting thread */ 784 /* Release the resources of the connecting thread */
785 CHECK_POSIX_DO( pthread_join( peer->p_ini_thr, NULL), /* ignore, it is not a big deal */); 785 CHECK_POSIX_DO( pthread_join( peer->p_ini_thr, NULL), /* ignore, it is not a big deal */);
786 peer->p_ini_thr = (pthread_t)NULL; 786 peer->p_ini_thr = (pthread_t)NULL;
787 787
788 switch (cur_state) { 788 switch (cur_state) {
789 case STATE_WAITCNXACK_ELEC: 789 case STATE_WAITCNXACK_ELEC:
790 case STATE_WAITCNXACK: 790 case STATE_WAITCNXACK:
791 LOG_D("%s: Connection established, %s", peer->p_hdr.info.pi_diamid, fd_cnx_getid(cnx)); 791 LOG_D("%s: Connection established, %s", peer->p_hdr.info.pi_diamid, fd_cnx_getid(cnx));
792 fd_p_ce_handle_newcnx(peer, cnx); 792 fd_p_ce_handle_newcnx(peer, cnx);
793 break; 793 break;
794 794
795 default: 795 default:
796 /* Just abort the attempt and continue */ 796 /* Just abort the attempt and continue */
797 TRACE_DEBUG(FULL, "Connection attempt successful but current state is %s, closing... (too slow?)", STATE_STR(cur_state)); 797 TRACE_DEBUG(FULL, "Connection attempt successful but current state is %s, closing... (too slow?)", STATE_STR(cur_state));
798 fd_cnx_destroy(cnx); 798 fd_cnx_destroy(cnx);
799 } 799 }
800 800
801 goto psm_loop; 801 goto psm_loop;
802 } 802 }
803 803
804 /* A new connection has not been established with the remote peer */ 804 /* A new connection has not been established with the remote peer */
805 if (event == FDEVP_CNX_FAILED) { 805 if (event == FDEVP_CNX_FAILED) {
806 806
807 /* Release the resources of the connecting thread */ 807 /* Release the resources of the connecting thread */
808 CHECK_POSIX_DO( pthread_join( peer->p_ini_thr, NULL), /* ignore, it is not a big deal */); 808 CHECK_POSIX_DO( pthread_join( peer->p_ini_thr, NULL), /* ignore, it is not a big deal */);
809 peer->p_ini_thr = (pthread_t)NULL; 809 peer->p_ini_thr = (pthread_t)NULL;
810 810
811 switch (cur_state) { 811 switch (cur_state) {
812 case STATE_WAITCNXACK_ELEC: 812 case STATE_WAITCNXACK_ELEC:
813 /* Abort the initiating side */ 813 /* Abort the initiating side */
814 fd_p_cnx_abort(peer, 0); 814 fd_p_cnx_abort(peer, 0);
815 /* Process the receiver side */ 815 /* Process the receiver side */
816 CHECK_FCT_DO( fd_p_ce_process_receiver(peer), goto psm_end ); 816 CHECK_FCT_DO( fd_p_ce_process_receiver(peer), goto psm_end );
817 break; 817 break;
818 818
819 case STATE_WAITCNXACK: 819 case STATE_WAITCNXACK:
820 /* Go back to CLOSE */ 820 /* Go back to CLOSE */
821 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc); 821 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc);
822 goto psm_reset; 822 goto psm_reset;
823 823
824 default: 824 default:
825 /* Just ignore */ 825 /* Just ignore */
826 TRACE_DEBUG(FULL, "Connection attempt failed but current state is %s, ignoring...", STATE_STR(cur_state)); 826 TRACE_DEBUG(FULL, "Connection attempt failed but current state is %s, ignoring...", STATE_STR(cur_state));
827 } 827 }
828 828
829 goto psm_loop; 829 goto psm_loop;
830 } 830 }
831 831
832 /* The timeout for the current state has been reached */ 832 /* The timeout for the current state has been reached */
833 if (event == FDEVP_PSM_TIMEOUT) { 833 if (event == FDEVP_PSM_TIMEOUT) {
834 switch (cur_state) { 834 switch (cur_state) {
835 case STATE_OPEN: 835 case STATE_OPEN:
836 case STATE_REOPEN: 836 case STATE_REOPEN:
837 case STATE_OPEN_NEW: 837 case STATE_OPEN_NEW:
838 CHECK_FCT_DO( fd_p_dw_timeout(peer), goto psm_end ); 838 CHECK_FCT_DO( fd_p_dw_timeout(peer), goto psm_end );
839 goto psm_loop; 839 goto psm_loop;
840 840
841 case STATE_CLOSED: 841 case STATE_CLOSED:
842 LOG_D("%s: Connecting...", peer->p_hdr.info.pi_diamid); 842 LOG_D("%s: Connecting...", peer->p_hdr.info.pi_diamid);
843 CHECK_FCT_DO( fd_psm_change_state(peer, STATE_WAITCNXACK), goto psm_end ); 843 CHECK_FCT_DO( fd_psm_change_state(peer, STATE_WAITCNXACK), goto psm_end );
844 fd_psm_next_timeout(peer, 0, CNX_TIMEOUT); 844 fd_psm_next_timeout(peer, 0, CNX_TIMEOUT);
845 CHECK_FCT_DO( fd_p_cnx_init(peer), goto psm_end ); 845 CHECK_FCT_DO( fd_p_cnx_init(peer), goto psm_end );
846 goto psm_loop; 846 goto psm_loop;
847 847
848 case STATE_SUSPECT: 848 case STATE_SUSPECT:
849 /* Mark the connection problem */ 849 /* Mark the connection problem */
850 peer->p_flags.pf_cnx_pb = 1; 850 peer->p_flags.pf_cnx_pb = 1;
851 case STATE_WAITCNXACK: 851 case STATE_WAITCNXACK:
852 case STATE_WAITCEA: 852 case STATE_WAITCEA:
853 fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, peer, "Timeout while waiting for remote peer", NULL); 853 fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, peer, "Timeout while waiting for remote peer", NULL);
854 case STATE_CLOSING: 854 case STATE_CLOSING:
855 /* Destroy the connection, restart the timer to a new connection attempt */ 855 /* Destroy the connection, restart the timer to a new connection attempt */
856 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc); 856 fd_psm_next_timeout(peer, 1, peer->p_hdr.info.config.pic_tctimer ?: fd_g_config->cnf_timer_tc);
857 goto psm_reset; 857 goto psm_reset;
858 858
859 case STATE_CLOSING_GRACE: 859 case STATE_CLOSING_GRACE:
860 /* The grace period is completed, now close */ 860 /* The grace period is completed, now close */
861 if (peer->p_flags.pf_localterm) 861 if (peer->p_flags.pf_localterm)
862 goto psm_end; 862 goto psm_end;
863 863
864 fd_psm_cleanup(peer, 0); 864 fd_psm_cleanup(peer, 0);
865 /* Reset the timer for next connection attempt */ 865 /* Reset the timer for next connection attempt */
866 fd_psm_next_timeout(peer, 1, fd_p_dp_newdelay(peer)); 866 fd_psm_next_timeout(peer, 1, fd_p_dp_newdelay(peer));
867 goto psm_loop; 867 goto psm_loop;
868 868
869 case STATE_WAITCNXACK_ELEC: 869 case STATE_WAITCNXACK_ELEC:
870 /* Abort the initiating side */ 870 /* Abort the initiating side */
871 fd_p_cnx_abort(peer, 0); 871 fd_p_cnx_abort(peer, 0);
872 /* Process the receiver side */ 872 /* Process the receiver side */
873 CHECK_FCT_DO( fd_p_ce_process_receiver(peer), goto psm_end ); 873 CHECK_FCT_DO( fd_p_ce_process_receiver(peer), goto psm_end );
874 goto psm_loop; 874 goto psm_loop;
875 875
876 default: 876 default:
877 ASSERT(0); /* implementation problem, we did not foresee this case? */ 877 ASSERT(0); /* implementation problem, we did not foresee this case? */
878 } 878 }
879 } 879 }
880 880
881 /* Default action : the handling has not yet been implemented. [for debug only] */ 881 /* Default action : the handling has not yet been implemented. [for debug only] */
882 TRACE_DEBUG(INFO, "Missing handler in PSM for '%s'\t<-- '%s'", STATE_STR(cur_state), fd_pev_str(event)); 882 TRACE_DEBUG(INFO, "Missing handler in PSM for '%s'\t<-- '%s'", STATE_STR(cur_state), fd_pev_str(event));
883 psm_reset: 883 psm_reset:
884 if (peer->p_flags.pf_delete) 884 if (peer->p_flags.pf_delete)
885 goto psm_end; 885 goto psm_end;
886 fd_psm_cleanup(peer, 0); 886 fd_psm_cleanup(peer, 0);
887 goto psm_loop; 887 goto psm_loop;
888 888
889 psm_end: 889 psm_end:
890 cur_state = fd_peer_getstate(peer); 890 cur_state = fd_peer_getstate(peer);
891 if ((cur_state == STATE_CLOSING) || (cur_state == STATE_CLOSING_GRACE)) { 891 if ((cur_state == STATE_CLOSING) || (cur_state == STATE_CLOSING_GRACE)) {
892 LOG_N("%s: Going to ZOMBIE state (no more activity) after normal shutdown", peer->p_hdr.info.pi_diamid); 892 LOG_N("%s: Going to ZOMBIE state (no more activity) after normal shutdown", peer->p_hdr.info.pi_diamid);
893 } else { 893 } else {
909 /************************************************************************/ 909 /************************************************************************/
910 /* Create the PSM thread of one peer structure */ 910 /* Create the PSM thread of one peer structure */
911 int fd_psm_begin(struct fd_peer * peer ) 911 int fd_psm_begin(struct fd_peer * peer )
912 { 912 {
913 TRACE_ENTRY("%p", peer); 913 TRACE_ENTRY("%p", peer);
914 914
915 /* Check the peer and state are OK */ 915 /* Check the peer and state are OK */
916 CHECK_PARAMS( fd_peer_getstate(peer) == STATE_NEW ); 916 CHECK_PARAMS( fd_peer_getstate(peer) == STATE_NEW );
917 917
918 /* Create the FIFO for events */ 918 /* Create the FIFO for events */
919 CHECK_FCT( fd_fifo_new(&peer->p_events, 0) ); 919 CHECK_FCT( fd_fifo_new(&peer->p_events, 0) );
920 920
921 /* Create the PSM controler thread */ 921 /* Create the PSM controller thread */
922 CHECK_POSIX( pthread_create( &peer->p_psm, NULL, p_psm_th, peer ) ); 922 CHECK_POSIX( pthread_create( &peer->p_psm, NULL, p_psm_th, peer ) );
923 923
924 /* We're done */ 924 /* We're done */
925 return 0; 925 return 0;
926 } 926 }
927 927
928 /* End the PSM (clean ending) */ 928 /* End the PSM (clean ending) */
929 int fd_psm_terminate(struct fd_peer * peer, char * reason ) 929 int fd_psm_terminate(struct fd_peer * peer, char * reason )
930 { 930 {
931 TRACE_ENTRY("%p", peer); 931 TRACE_ENTRY("%p", peer);
932 CHECK_PARAMS( CHECK_PEER(peer) ); 932 CHECK_PARAMS( CHECK_PEER(peer) );
933 933
934 if (fd_peer_getstate(peer) != STATE_ZOMBIE) { 934 if (fd_peer_getstate(peer) != STATE_ZOMBIE) {
935 CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, 0, reason) ); 935 CHECK_FCT( fd_event_send(peer->p_events, FDEVP_TERMINATE, 0, reason) );
936 } else { 936 } else {
937 TRACE_DEBUG(FULL, "Peer '%s' was already terminated", peer->p_hdr.info.pi_diamid); 937 TRACE_DEBUG(FULL, "Peer '%s' was already terminated", peer->p_hdr.info.pi_diamid);
938 } 938 }
941 941
942 /* End the PSM & cleanup the peer structure */ 942 /* End the PSM & cleanup the peer structure */
943 void fd_psm_abord(struct fd_peer * peer ) 943 void fd_psm_abord(struct fd_peer * peer )
944 { 944 {
945 TRACE_ENTRY("%p", peer); 945 TRACE_ENTRY("%p", peer);
946 946
947 /* Cancel PSM thread */ 947 /* Cancel PSM thread */
948 CHECK_FCT_DO( fd_thr_term(&peer->p_psm), /* continue */ ); 948 CHECK_FCT_DO( fd_thr_term(&peer->p_psm), /* continue */ );
949 949
950 /* Cleanup the data */ 950 /* Cleanup the data */
951 fd_psm_cleanup(peer, 1); 951 fd_psm_cleanup(peer, 1);
952 952
953 /* Destroy the event list */ 953 /* Destroy the event list */
954 CHECK_FCT_DO( fd_fifo_del(&peer->p_events), /* continue */ ); 954 CHECK_FCT_DO( fd_fifo_del(&peer->p_events), /* continue */ );
955 955
956 /* Remaining cleanups are performed in fd_peer_free */ 956 /* Remaining cleanups are performed in fd_peer_free */
957 return; 957 return;
958 } 958 }
959 959
"Welcome to our mercurial repository"