Mercurial > hg > waaad
view waaad/peer-out.c @ 371:e86dba02630a
Updated copyright information
author | Sebastien Decugis <sdecugis@nict.go.jp> |
---|---|
date | Mon, 25 May 2009 14:51:46 +0900 |
parents | b9b74d6ac29e |
children |
line wrap: on
line source
/********************************************************************************************************* * Software License Agreement (BSD License) * * Author: Sebastien Decugis <sdecugis@nict.go.jp> * * * * Copyright (c) 2009, WIDE Project and NICT * * All rights reserved. * * * * Redistribution and use of this software in source and binary forms, with or without modification, are * * permitted provided that the following conditions are met: * * * * * Redistributions of source code must retain the above * * copyright notice, this list of conditions and the * * following disclaimer. * * * * * Redistributions in binary form must reproduce the above * * copyright notice, this list of conditions and the * * following disclaimer in the documentation and/or other * * materials provided with the distribution. * * * * * Neither the name of the WIDE Project or NICT nor the * * names of its contributors may be used to endorse or * * promote products derived from this software without * * specific prior written permission of WIDE Project and * * NICT. * * * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED * * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A * * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR * * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR * * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * *********************************************************************************************************/ /* Peers facility. * * This file contains the code to send messages pending on a peer. * */ #include "waaad-internal.h" #include "peer-internal.h" /* A structure that is used in the next functions */ struct requeue_data { msg_t * msg; _peer_t * peer; }; /* If thread is canceled while processing a message, we have to requeue this message so it is not lost */ static void cleanup_requeue(void * data) { struct requeue_data * rd = (struct requeue_data *)data; TRACE_ENTRY( "%p", data); CHECK_PARAMS_DO( data, return ); if (rd->msg) { CHECK_FCT_DO( meq_post(rd->peer->p_out_q, rd->msg), { (void) msg_free(rd->msg, 1); return; } ); rd->msg = NULL; } return; } /* The function that actually renders a msg_t structure and write it to the socket, using security mechanism */ int _peer_out_sendmsg(_peer_t * peer, msg_t * msg) { unsigned char *buf = NULL; size_t len; TRACE_DEBUG(FULL, "Sending message to peer '%s':", peer->p_diamid); msg_dump_walk(FULL, msg); /* Render the message as a buffer */ CHECK_FCT_DO( msg_bufferize ( msg, &buf, &len ), { /* What are we supposed to do? just discard the message or start failover? */ /* => let's discard this message, with a log entry */ log_error("An error occurred while generating a message for sending to peer '%s', message dropped.\n", peer->p_diamid); CHECK_FCT_DO(msg_free(msg, 1), ); return 0; /* is this OK with the cancelation handler??? */ } ); pthread_cleanup_push( cleanup_buffer, buf ); /* Now send the message using the appropriate security module */ ASSERT( peer && peer->p_secmod && peer->p_secmod->sec_send_protect ); CHECK_FCT( (*peer->p_secmod->sec_send_protect) ( &peer->p_sec_session, &peer->p_ext_session, buf, len ) ); /* free the buffer */ pthread_cleanup_pop( 1 ); return 0; } int _peer_out_savesentreq(_peer_t * peer, msg_t * msg, uint32_t hbh) { uti_list_t * next; p_sent_t * new; CHECK_MALLOC( new = (p_sent_t *)malloc(sizeof(p_sent_t)) ); uti_list_init(&new->chain, msg); new->hbh = hbh; /* Update the internal "routable" data now, so that the failover operation is faster */ (void)msg_is_routable(msg); /* Now search place to save this hop-by-hop id in the sent messages. Since hbh is a monotonic incrementing counter, we search backward. (we actually have to search only in case where counter overflowed, otherwise it's always added at the last position) */ for (next = &peer->p_sent; next->prev != &peer->p_sent; next = next->prev) { if (((p_sent_t *)(next->prev))->hbh < hbh) break; } /* Now save the new message in the list */ uti_list_insert_before(next, &new->chain); TRACE_DEBUG(FULL, "Request (hbh %x) saved in list for peer '%s'", hbh, peer->p_diamid); return 0; } /* The code of the p_out_th tread */ /* This thread pulls messages from the p_out_q queue, and deal with them */ static void * _peer_out_th(void * arg) { struct requeue_data rd; THREAD_NAME_PEER( "Outgoing", arg ); TRACE_ENTRY( "%p", arg ); CHECK_PARAMS_DO( VALIDATE_PEER(arg), return NULL ); rd.peer = _P(arg); /* This thread loops until it is canceled */ while (1) { msg_data_t * hdr; uint32_t hbh; /* the hbh to use for sending the message on next hop */ pthread_testcancel(); /* Get the next message to be sent. */ CHECK_FCT_DO( meq_get ( rd.peer->p_out_q, &(rd.msg) ), goto error ); /* We removed the message from the queue, we need to reput it if we're interrupted now */ pthread_cleanup_push( cleanup_requeue, &rd ); /* Get the header information of the message */ CHECK_FCT_DO( msg_data(rd.msg, &hdr), { CHECK_FCT_DO(msg_free(rd.msg, 1), ); goto error; } ); hbh = hdr->msg_hbhid; /* Save it if it's a request */ if (hdr->msg_flags & CMD_FLAG_REQUEST) { /* Get the next available hbh id -- no need to lock since we're the only thread modifying this value. */ hbh = rd.peer->p_hbh; rd.peer->p_hbh = hbh + 1; /* Lock the peer */ CHECK_POSIX_DO( pthread_mutex_lock(&(rd.peer)->p_lock), goto error ); /* Save the message in the sent queue first, so that if we are cancelled the failover will resend it */ CHECK_FCT_DO( _peer_out_savesentreq( rd.peer, rd.msg, hbh ), { CHECK_POSIX_DO( pthread_mutex_unlock(&(rd.peer)->p_lock), /* continue */ ); log_error("An error occurred while saving a message to send to peer '%s', message dropped.", (rd.peer)->p_diamid); CHECK_FCT_DO(msg_free(rd.msg, 1), ); continue; } ); /* and unlock */ CHECK_POSIX_DO( pthread_mutex_unlock(&(rd.peer)->p_lock), goto error ); } /* The message is now safe, we don't need to requeue it anymore */ pthread_cleanup_pop( 0 ); { uint32_t prev = hdr->msg_hbhid; hdr->msg_hbhid = hbh; /* Send the message */ CHECK_FCT_DO( _peer_out_sendmsg(rd.peer, rd.msg), { log_error("An error occurred while sending a message to peer '%s'.", (rd.peer)->p_diamid); } ); /* Restore */ hdr->msg_hbhid = prev; } /* If it was an answer, we can discard the message */ if ( ! (hdr->msg_flags & CMD_FLAG_REQUEST)) { CHECK_FCT_DO(msg_free(rd.msg, 1), /* should we fail? */ ); } /* loop to the next message */ } error: /* Signal the parent thread that we have exited */ CHECK_POSIX_DO( _peer_events_send(rd.peer, PEVENT_TH_TERM_OUT, NULL), /* ignore */ ); /* Return */ TRACE_DEBUG (FULL, "Thread is terminated."); return NULL; } /* Start the emitter thread for a peer */ int _peer_out_start(_peer_t * peer) { CHECK_PARAMS( VALIDATE_PEER(peer) ); CHECK_PARAMS( peer->p_secmod ); CHECK_PARAMS( peer->p_sec_session.conn ); CHECK_PARAMS( peer->p_out_q ); CHECK_PARAMS( peer->p_out_th == (pthread_t)NULL ); /* Ok the peer is in appropriate state, let's start the thread */ CHECK_POSIX( pthread_create( &peer->p_out_th, NULL, _peer_out_th, peer ) ); /* done */ return 0; } /* Stop this receiving thread */ int _peer_out_stop(_peer_t * peer) { CHECK_PARAMS( VALIDATE_PEER(peer) ); if (peer->p_out_th != (pthread_t)NULL) return _thread_term(&peer->p_out_th); else return 0; }