changeset 150:5fc78f5e3731

Added more code to the peer module; now opens listening sockets; code still incomplete and highly experimental
author Sebastien Decugis <sdecugis@nict.go.jp>
date Tue, 09 Sep 2008 17:13:26 +0900
parents 9ceee08d0c62
children f07c17bb0322
files waaad/peer-internal.h waaad/peer-listener.c waaad/peer-recv.c waaad/peer-thctl.c waaad/peer.c
diffstat 5 files changed, 480 insertions(+), 43 deletions(-) [+]
line wrap: on
line diff
--- a/waaad/peer-internal.h	Tue Sep 09 17:12:48 2008 +0900
+++ b/waaad/peer-internal.h	Tue Sep 09 17:13:26 2008 +0900
@@ -71,6 +71,11 @@
 #define  CNX_TIMEOUT	10	/* in seconds */
 #endif /* CNX_TIMEOUT */
 
+/* Timeout for receiving a CER after incoming connection is established */
+#ifndef INCNX_TIMEOUT
+#define  INCNX_TIMEOUT	 20	/* in seconds */
+#endif /* INCNX_TIMEOUT */
+
 /* States of a peer */
 typedef enum {
 	/* Stable states */
@@ -99,6 +104,10 @@
 	PEVENT_DISCONNECTED,	/* the socket has been disconnected */
 	PEVENT_EXPIRE,		/* the peer lifetime is expiring; call p_addinfo.pa_cb or delete */
 	PEVENT_MSGRCVD,		/* a message was received */
+	PEVENT_SND_FAILED,	/* Failed to send a message (means probably connection is broken) */
+	PEVENT_ASSOC_CHG,	/* association changed (SCTP notification) */
+	PEVENT_PEERADDR_CHG,	/* peer address changed (SCTP notification) */
+	PEVENT_REMOTE_ERR,	/* Remote error (SCTP notification) */
 	PEVENT_MAX	/* To be continued */
 } _pevent_t;
 
@@ -149,6 +158,9 @@
 	
 	/* Connection information */
 	int		 p_sock;	/* We use standard Berkeley sockets for both TCP and SCTP connections */
+	uint16_t	 p_ostr;	/* number of outbound streams (for SCTP) */
+	uint16_t	 p_istr;	/* number of inbound streams (for SCTP) */
+	uint16_t	 p_curstr;	/* id of the last used out stream */
 	int		 p_sock_tmp;	/* In case of election, store the socket on which the CER was received here, temporarily */
 	size_t		 p_peeraddr_sz;	/* Number of items in the array p_peeraddr bellow */
 	sSS 		*p_peeraddr;	/* Array of the attachment points of the remote peer (received in Host-IP-Address AVPs) */
@@ -189,7 +201,8 @@
 extern pthread_cond_t	_peers_lists_cond;/* the condvar that is used by _peer_expire_th */
 
 /* The eye-catcher value */
-#define PEER_EYEC	0x23350B7
+#define PEER_EYEC	0x023350B7
+#define PEER_EYEC_DUMMY	0xD23350B7
 
 /* Cast macro */
 #define _P( _peerptr_ ) ((_peer_t *)( _peerptr_ ))
@@ -197,8 +210,39 @@
 /* Check macro */
 #define VALIDATE_PEER( _peer_ ) 				\
 	(((_peer_) != NULL) 					\
-	&& (_P(_peer_)->p_eyec == PEER_EYEC)			\
-	&& (_P(_peer_)->p_global.head != &g_peers_deleted))	
+	&& ((_P(_peer_)->p_eyec == PEER_EYEC) 			\
+	  || (_P(_peer_)->p_eyec == PEER_EYEC_DUMMY))		\
+	&& (_P(_peer_)->p_exp_list.head != &_peers_deleted))	
+
+
+/* Initialize a sec_session_t with callbacks for TCP or SCTP */
+#define SEC_SESS_INIT( _sessobj_, _proto_, _peer_, _psock_ ) {	\
+	sec_session_t * __sess = (sec_session_t *) (_sessobj_);	\
+	peer_t * __peer = (peer_t *) (_peer_);			\
+	sec_conn_t * __conn = (sec_conn_t *) (_psock_);		\
+	memset(__sess, 0, sizeof(sec_session_t));		\
+	__sess->peer = __peer;					\
+	__sess->conn = __conn;					\
+	__sess->proto = (_proto_);				\
+	switch (__sess->proto) {				\
+	case IPPROTO_TCP:					\
+		__sess->cbs.tcp.send_data = 			\
+			(sec_send_tcp_cb) _peer_tcp_send;	\
+		__sess->cbs.tcp.recv_data = 			\
+			(sec_recv_tcp_cb) _peer_tcp_recv;	\
+		break;						\
+	case IPPROTO_SCTP:					\
+		__sess->cbs.sctp.send_data = 			\
+			(sec_send_sctp_cb) _peer_sctp_send;	\
+		__sess->cbs.sctp.recv_data = 			\
+			(sec_recv_sctp_cb) _peer_sctp_recv;	\
+		break;						\
+	default: assert(0);					\
+	}							\
+}
+
+
+
 
 /*
  * The internal functions 
@@ -210,12 +254,14 @@
 int _peer_destroy(_peer_t * peer, int locked);
 
 
+
 /*** peer-events.c ***/
 
 /* Sending events to a peer */
 int _peer_sendevent(_peer_t * peer, _pevent_t event, void * data);
 
 
+
 /*** peer-listener.c ***/
 
 /* Create all the listening threads and sockets */
@@ -225,24 +271,28 @@
 int _peer_stop_server();
 
 
+
 /*** peer-psm.c ***/
 
 /* Code of the thread for the peer state machine (one per peer) */
 void * _peer_state_machine_th(void * arg);
 
 
+
 /*** peer-recv.c ***/
 
 /* Code of the thread for incoming messages (one per peer) */
 void * _peer_in_th(void * arg);
 
 
+
 /*** peer-send.c ***/
 
 /* Code of the thread for outgoing messages (one per peer) */
 void * _peer_out_th(void * arg);
 
 
+
 /*** peer-sctp.c ***/
 
 /* Create a listening socket */
@@ -255,10 +305,11 @@
 int _peer_sctp_connect( const sSA *addr, socklen_t addrlen, int *sock, uint16_t *ostreams, uint16_t *istreams );
 
 /* Send data using a stream */
-ssize_t _peer_sctp_send(_peer_t * peer, int *streamid, const void * data, size_t len);
+ssize_t _peer_sctp_send( sec_session_t * sess, int *streamid, const void * data, size_t len);
 
 /* Receive a message, get the stream id */
-ssize_t _peer_sctp_recv(_peer_t * peer, int * streamid, void ** data, size_t *len);
+ssize_t _peer_sctp_recv( sec_session_t * sess, uint16_t * streamid, void ** data, size_t *len);
+
 
 
 /*** peer-tcp.c ***/
@@ -272,11 +323,19 @@
 /* Connect to a remote server */
 int _peer_tcp_connect( const sSA *addr, socklen_t addrlen, int *sock );
 
+/* Send data over a socket */
+ssize_t _peer_tcp_send( sec_session_t * sess, void *data, size_t len);
+
+/* Receive data from a socket */
+ssize_t _peer_tcp_recv( sec_session_t * sess, void *data, size_t len);
+
+
 
 /*** peer-thctl.c ***/
 
 /* Cancel a thread */
-int _peer_cancel_th(pthread_t * thread);
+int _peer_cancel_th(pthread_t * thread, int stop_on_error);
+
 
 
 /*** peer-expire.c ***/
@@ -288,5 +347,6 @@
 void * _peer_expire_th (void * arg);
 
 
+
 #endif /* ! _PEER_INTERNAL_H */
 
--- a/waaad/peer-listener.c	Tue Sep 09 17:12:48 2008 +0900
+++ b/waaad/peer-listener.c	Tue Sep 09 17:13:26 2008 +0900
@@ -50,55 +50,352 @@
 	int		proto;	/* IPPROTO_TCP or IPPROTO_SCTP */
 	int		sock;	/* the socket identifier */
 	pthread_t	listen;	/* the thread listening on this socket */
-	int		started;/* the thread is started */
-} _peer_conninfo_t;
+	int		started;/* 1 the thread is started, 2 the thread already exited */
+	uti_list_t	clients;/* List of clients informations */
+	pthread_mutex_t	climtx;	/* mutex protecting this list */
+} _peer_servinfo_t;
+
+/* Store a new client information */
+typedef struct {
+	uti_list_t	list;	/* link to the list of pending clients. the o fields points to the mutex */
+	pthread_t	thr;	/* the thread handling this client */
+	int		proto;	/* IPPROTO_TCP or IPPROTO_SCTP */
+	int		sock;	/* the socket identifier */
+	uint16_t	ostr;	/* outbound streams, for SCTP */
+	uint16_t	istr;	/* inbound streams, for SCTP */
+} _peer_cliinfo_t;
+
+#define CI_MTX( _ci_ ) ((pthread_mutex_t *)((_ci_)->list.o))
+
+/* Array of server sockets */
+static _peer_servinfo_t * _psi = NULL;
+
+/* "no sec" security module reference */
+static _sec_item_t _smi = { .sm = NULL };
+
 
-static _peer_conninfo_t * _pci = NULL;
+static void _receiver_th_cleanup_peer (void * arg)
+{
+	int ret;
+	_peer_t * dum;
+	
+	TRACE_DEBUG(FULL, "Cleaning up receiver thread for dummy peer %p", arg);
+	dum = _P(arg);
+	
+	if (dum->p_in_th) {
+		(void) _peer_cancel_th(&dum->p_in_th, 0);
+	}
+	if (dum->p_in_q) {
+		(void) meq_del(&dum->p_in_q);
+	}
+	while (!IS_LIST_EMPTY( &dum->p_events)) {
+		uti_list_t * event = dum->p_events.next;
+		uti_list_unlink(event);
+		free(event);
+	}
+	(void) pthread_cond_destroy(&dum->p_condvar);
+	(void) pthread_mutex_destroy(&dum->p_lock);
+	
+	/* End of cleanup handler */
+	return;
+}
+
+static void _receiver_th_cleanup_lock (void * arg)
+{
+	(void) pthread_mutex_unlock(&(_P(arg)->p_lock));
+	return;
+}
 
 
 /* The code of a receiver thread, to handle an incoming connection, before the CER is received */
 static void * _peer_receiver_th(void * arg)
 {
+	int ret = 0;
+	_peer_cliinfo_t *ci = (_peer_cliinfo_t *) arg;
+	_peer_t dummy;
+	struct timespec	delay;
+	_pe_t * event;
+	msg_t * received;
+	
 	TRACE_ENTRY( "%p", arg );
 	
-	/* Wait for incoming message, until timeout */
+	TRACE_DEBUG(FULL, "Thread created to handle a new client: %p", pthread_self());
+	
+	/* Initialize the dummy peer structure to receive a message */
+	memset(&dummy, 0, sizeof(dummy));
+	
+	dummy.p_eyec = PEER_EYEC_DUMMY;
+	dummy.p_diamid = "(anonymous client)";
+	
+	ret = pthread_mutex_init(&(dummy.p_lock), NULL);
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "Unable to initialize a mutex: %s", strerror(ret));
+		goto end;
+	}
+	ret = pthread_cond_init(&(dummy.p_condvar), NULL);
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "Unable to initialize a condvar: %s", strerror(ret));
+		goto end;
+	}
+	
+	/* The queue where the received message will be posted */
+	uti_list_init(&(dummy.p_events), NULL);
+	ret = meq_new(&(dummy.p_in_q));
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "Unable to create a message queue: %s", strerror(ret));
+		goto end;
+	}
+	
+	/* The security module is set to the "no_sec" module. This simplifies the process of receiving the message. */
+	if (_smi.sm == NULL) {
+		ret = sec_module(0, &_smi);
+		if (ret != 0) {
+			TRACE_DEBUG(INFO, "Unable to get a 'no_sec' security module to handle incoming connections: %s", strerror(ret));
+			goto end;
+		}
+	}
+	dummy.p_secmod = _smi.sm;
+	
+	/* the sec_session_t object used by the security module */
+	SEC_SESS_INIT( &(dummy.p_sec_session), ci->proto, &dummy, &ci->sock );
+	
+	/* Lock the peer to wait for incoming events */
+	ret = pthread_mutex_lock(&(dummy.p_lock));
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "Failed to lock a mutex: %s", strerror(ret));
+		goto end;
+	}
+	
+	/* Create the receiver thread */
+	ret = pthread_create(&(dummy.p_in_th), NULL, _peer_in_th, (void *)&dummy);
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "Failed to create a thread: %s", strerror(ret));
+		goto end;
+	}
+
+	/* Compute the timeout absolute time for receiving the CER message */
+	ret = clock_gettime(CLOCK_REALTIME, &delay);
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "Unable to get the clock time: %s", strerror(errno));
+		goto end;
+	}
+	delay.tv_sec += INCNX_TIMEOUT;
+	
+	/* Stack the cleanup handler, in case we are canceled */
+	pthread_cleanup_push(_receiver_th_cleanup_peer, &dummy);
 	
-	/* if message is not CER, disconnect */
+	/* Now wait for an event to occur. Such events include: connection broken, message received. 
+	 In any case, if we have not received a CER message, we just abort the connection and free the resources.
+	 */
+spurious:
+	pthread_cleanup_push(_receiver_th_cleanup_lock, &dummy);
+	ret = pthread_cond_timedwait( &(dummy.p_condvar), &(dummy.p_lock), &delay );
+	pthread_cleanup_pop(0);
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "Waiting on a condvar failed: %s", strerror(ret));
+		goto end;
+	}
+	/* Check for spurious wakeup */
+	if (IS_LIST_EMPTY( &(dummy.p_events) ))
+		goto spurious;
+	
+	/* Ok, retrieve the next event */
+	event = (_pe_t *)(dummy.p_events.next);
+	uti_list_unlink( _LIST(event) );
+	
+	TRACE_DEBUG(FULL, "Thread %p received event %d", pthread_self(), event->event);
 	
-	/* if CER, check we do not have already a peer with this diameter-id in the list */
+	switch (event->event) {
+		/* The following events are ignored */
+		case PEVENT_ASSOC_CHG:
+		case PEVENT_PEERADDR_CHG:
+			goto spurious; 
+		
+		case PEVENT_MSGRCVD:
+			/* Retrieve the message from the queue */
+			ret = meq_tryget(dummy.p_in_q, &received);
+			if (ret != 0) {
+				TRACE_DEBUG(INFO, "Unable to retrieve the received message: %s", strerror(ret));
+				goto end;
+			}
+			
+			/* Parse the message against the dictionary */
+			ret = msg_parse_dict(received);
+			if (ret == 0) {
+				/* Check we have received a CER */
+				TRACE_DEBUG(INFO, "Received valid message on incoming connection; handling is not implemented yet...");
+				msg_dump_walk(received);
+				
+					/* If we have a valid CER, break */
+					break;
+			}
+			
+			TRACE_DEBUG(INFO, "Failed to parse received message: %s", strerror(ret));
+			(void) msg_free(received, 1);
+			
+			/* fallback to the default case */
+		
+		default:
+			/* We received an invalid event or message */
+				
+			ret = pthread_mutex_unlock(&(dummy.p_lock));
+			if (ret != 0) {
+				TRACE_DEBUG(INFO, "Failed to unlock a mutex: %s", strerror(ret));
+			}
+			
+			/* Cleanup everything */
+			goto end;
+	}
+	
+	/* We received a CER */
+	
+	/* Cancel receiver thread */
+	ret = _peer_cancel_th(&(dummy.p_in_th), 1);
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "An error occurred while canceling a thread: %s", strerror(ret));
+		goto end;
+	}
+	dummy.p_in_th = (pthread_t) NULL;
+	
+	/* Now parse the content of the CER received */
+	
+	/* check we do not have already a peer with this diameter-id in the list */
 	
 	/* depending on the peer state, act as needed */
 	
 	TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ );
+	
+	pthread_cleanup_pop(0);
+end:
+	/* Destroy the dummy peer */
+	_receiver_th_cleanup_peer(&dummy);
+
+	/* Remove the client information from the list */
+	ret = pthread_mutex_lock(CI_MTX(ci));
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "Failed to lock a mutex: %s", strerror(ret));
+	}
+	
+	uti_list_unlink(_LIST(ci));
+	
+	ret = pthread_mutex_unlock(CI_MTX(ci));
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "Failed to unlock a mutex: %s", strerror(ret));
+	}
+	
+	/* close the socket */
+	shutdown(ci->sock, SHUT_RDWR);
+	
+	/* detach this thread */
+	pthread_detach(pthread_self());
+	
+	/* free this client resource */
+	free(ci);
+	
+	/* Exit */
 	return NULL;
 }
+
 /* The code of the thread waiting for incoming connection. 1 thread per listening socket. */
 static void * _peer_listen_th(void * arg)
 {
 	int ret;
-	_peer_conninfo_t * ci = (_peer_conninfo_t *)arg;
+	_peer_servinfo_t * si = (_peer_servinfo_t *)arg;
+	_peer_cliinfo_t  * ci = NULL;
 	
 	TRACE_ENTRY( "%p", arg );
 	
-	/* (conninfo_t *)arg; */
+	si->started = 1;
+	
+	/* Initialize the list of clients */
+	uti_list_init ( &si->clients, NULL );
+	ret = pthread_mutex_init(&si->climtx, NULL);
+	if (ret != 0) {
+		TRACE_DEBUG(INFO, "Unable to initialize a mutex: %s", strerror(ret));
+		goto error;
+	}
 	
 	/* Listen, to enable remote peers to connect */
-	ret = listen(ci->sock, 5);
+	ret = listen(si->sock, 5);
 	if (ret == -1) {
 		ret = errno;
-		TRACE_DEBUG(INFO, "Unable to listen on the socket: %s", strerror(ret));
-		
+		TRACE_DEBUG(INFO, "Unable to listen on the socket %d: %s", si->sock, strerror(ret));
 		goto error;
 	}
 	
 	/* loop */
-		/* accept() */
+	while (1) {
+		/* Create a cliinfo object to receive new client connection information */
+		ci = (_peer_cliinfo_t *) malloc(sizeof(_peer_cliinfo_t));
+		if (ci == NULL) {
+			TRACE_DEBUG(INFO, "Memory allocation failed!");
+			goto error;
+		}
+		memset(ci, 0, sizeof(_peer_cliinfo_t));
+		uti_list_init ( &ci->list, &si->climtx );
+		ci->proto = si->proto;
+		
+		/* Accept a new client */
+		switch (si->proto) {
+			case IPPROTO_TCP:
+				/* we use a simple accept */
+				ci->sock = accept(si->sock, NULL, NULL);
+				if (ci->sock < 0) {
+					ret = errno;
+					TRACE_DEBUG(INFO, "Unable to accept incoming TCP connection: %s", strerror(ret));
+					goto error;
+				}
+				break;
+			
+			case IPPROTO_SCTP:
+				/* We use our wrapper */
+				ret = _peer_sctp_accept( si->sock, &ci->sock, &ci->ostr, &ci->istr );
+				if (ret != 0) {
+					TRACE_DEBUG(INFO, "Unable to accept incoming SCTP connection: %s", strerror(ret));
+					goto error;
+				}
+				break;
+			
+			default:
+				assert(0);
+		}
 	
-		/* On new connection, create a receiver thread (_peer_receiver_th) */
+		/* Save this new client in the list of the server */
+		ret = pthread_mutex_lock(&si->climtx);
+		if (ret != 0) {
+			TRACE_DEBUG(INFO, "Unable to lock a mutex: %s", strerror(ret));
+			goto error;
+		}
+		
+		uti_list_insert_before ( &si->clients, &ci->list );
+
+		ret = pthread_mutex_unlock(&si->climtx);
+		if (ret != 0) {
+			TRACE_DEBUG(INFO, "Unable to unlock a mutex: %s", strerror(ret));
+			goto error;
+		}
+		
+		/* And now start the received thread for this client */
+		ret = pthread_create( &ci->thr, NULL, _peer_receiver_th, ci);
+		if (ret != 0) {
+			TRACE_DEBUG(INFO, "Unable to create a thread: %s", strerror(ret));
+			goto error;
+		}
+	}
+		
 error:
+	TRACE_DEBUG(INFO, " Error occurred in listening server thread");
+	TRACE_DEBUG(INFO, "  Proto : %s", (si->proto == IPPROTO_TCP) ? "IPPROTO_TCP" : 
+						(si->proto == IPPROTO_SCTP) ? "IPPROTO_SCTP" : 
+							"???");
+	TRACE_DEBUG(INFO, "  Sock  : %d", si->sock);
+
+	si->started = 2;
+
 	/* Signal the daemon that something went wrong here ... how? */
+	#warning "Support not complete yet"	
+
 		
-	
 	TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ );
 	return NULL;
 }
@@ -108,12 +405,7 @@
 {
 	int ret = 0;
 	int cnt = 1; /* at least one for the SCTP server */
-	
-	/* -- must use these (from config)  --
-		int		 disable_inet4;
-		int		 disable_inet6;
-		int		 disable_tcp;
-	*/
+	int i   = 1;
 	
 	TRACE_ENTRY( );
 	
@@ -121,30 +413,96 @@
 	cnt += _peer_tcp_count_servers();
 	
 	/* Allocate the _pci array */
-	_pci = calloc(cnt, sizeof(_peer_conninfo_t));
-	if (_pci == NULL) {
+	_psi = calloc(cnt, sizeof(_peer_servinfo_t));
+	if (_psi == NULL) {
 		TRACE_DEBUG(INFO, "Memory allocation failed: %s", strerror(errno));
 		return ENOMEM;
 	}
 	
 	/* The first element is the SCTP server, create this one already */
-	_pci[0].proto = IPPROTO_SCTP;
-	ret = _peer_sctp_create_server( &_pci[0].sock );
+	_psi[0].proto = IPPROTO_SCTP;
+	ret = _peer_sctp_create_server( &_psi[0].sock );
 	if (ret != 0) {
 		TRACE_DEBUG(INFO, "An error occurred while creating the SCTP server: %s", strerror(ret));
 		return ret;
 	}
 	
 	/* Now create the TCP servers */
+	if ( ! g_pconf->disable_tcp ) {
+		sSS stor;
+		sSA  * sa  = (sSA  *)&stor;
+		sSA4 * sin = (sSA4 *)&stor;
+		sSA6 * sin6 = (sSA6 *)&stor;
+		sSS  * ss = g_pconf->local_addr_pri_sa;
+		
+		if (ss == NULL) {
+			/* The list of addresses is not specified, bind to any */
+			
+			if (! g_pconf->disable_inet4 ) {
+				
+				memset(&stor, 0, sizeof(sSS));
+				sin->sin_family = AF_INET;
+				sin->sin_port = htons(g_pconf->local_port);
+				sin->sin_addr.s_addr = INADDR_ANY;
+				
+				_psi[i].proto = IPPROTO_TCP;
+				
+				ret = _peer_tcp_create_server( &(_psi[i].sock), sa, sizeof(sSA4) );
+				if (ret != 0) {
+					TRACE_DEBUG(INFO, "An error occurred while creating the TCP server: %s", strerror(ret));
+					return ret;
+				}
+				
+				i++;
+			}
+					
+			if (! g_pconf->disable_inet6 ) {
+				
+				memset(&stor, 0, sizeof(sSS));
+				sin6->sin6_family = AF_INET6;
+				sin6->sin6_port = htons(g_pconf->local_port);
+				sin6->sin6_addr = in6addr_any;
+				
+				_psi[i].proto = IPPROTO_TCP;
+				
+				ret = _peer_tcp_create_server( &(_psi[i].sock), sa, sizeof(sSA6) );
+				if (ret != 0) {
+					TRACE_DEBUG(INFO, "An error occurred while creating the TCP server: %s", strerror(ret));
+					return ret;
+				}
+				
+				i++;
+			}
+		} else {
+			/* The list of addresses to bind to is in the configuration file */
+			while (ss->ss_family != 0) {
+				_psi[i].proto = IPPROTO_TCP;
+				
+				ret = _peer_tcp_create_server( &(_psi[i].sock), (sSA *)ss, sizeof(sSS) );
+				if (ret != 0) {
+					TRACE_DEBUG(INFO, "An error occurred while creating the TCP server: %s", strerror(ret));
+					return ret;
+				}
+				ss++;
+				i++;
+			}
+		}
+		
+		ASSERT(i == cnt);
+	}
 	
-	
+	TRACE_DEBUG(FULL, "All %d server sockets are created, now starting the server threads...", cnt);
 	
 	/* Start all the _peer_listen_th threads */
-	
-	
+	for (i=0; i < cnt; i++) {
+		ret = pthread_create( &_psi[i].listen, NULL, _peer_listen_th, (void *)&_psi[i] );
+		if (ret != 0) {
+			TRACE_DEBUG(INFO, "Failed to create a listener thread: %s", strerror(ret));
+			return ret;
+		}
+	}
 	
-	TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ );
-	return ENOTSUP;
+	return 0;
 }
 
 int _peer_stop_server()
@@ -152,9 +510,15 @@
 	TRACE_ENTRY( );
 	TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ );
 	
-	if (_pci) {
-		free(_pci);
-		_pci = NULL;
+	if (_psi) {
+		
+		/* loop in the list, kill the threads if any, close the sockets */
+		
+		/* Also loop in all clients lists and kill the threads */
+		
+		/* then free the object */
+		free(_psi);
+		_psi = NULL;
 	}
 	
 	return ENOTSUP;
--- a/waaad/peer-recv.c	Tue Sep 09 17:12:48 2008 +0900
+++ b/waaad/peer-recv.c	Tue Sep 09 17:13:26 2008 +0900
@@ -49,6 +49,17 @@
 	
 	TRACE_ENTRY( "%p", arg );
 	
+	/* loop until error */
+	
+		/* receive next buffer from security module */
+	
+		/* quickly parse this buffer into a message */
+	
+		/* enqueue this message in the "incoming" queue of the peer */
+	
+		/* post an event "message received" */
+		
+	/* end of loop */
 	
 	TRACE_DEBUG (INFO, "@@@ %s: not implemented yet.", __FUNCTION__ );
 	return NULL;
--- a/waaad/peer-thctl.c	Tue Sep 09 17:12:48 2008 +0900
+++ b/waaad/peer-thctl.c	Tue Sep 09 17:13:26 2008 +0900
@@ -42,7 +42,7 @@
 #include "waaad-internal.h"
 #include "peer-internal.h"	
 
-int _peer_cancel_th(pthread_t * thread)
+int _peer_cancel_th(pthread_t * thread, int stop_on_error)
 {
 	int ret = 0;
 	
@@ -52,14 +52,16 @@
 	ret = pthread_cancel(*thread);
 	if (ret != 0) {
 		TRACE_DEBUG(INFO, "pthread_cancel failed: %s", strerror(ret));
-		return ret;
+		if (stop_on_error)
+			return ret;
 	}
 	
 	/* Now wait for the thread to terminate cleanly */
 	ret = pthread_join(*thread, NULL);
 	if (ret != 0) {
 		TRACE_DEBUG(INFO, "pthread_wait failed: %s", strerror(ret));
-		return ret;
+		if (stop_on_error)
+			return ret;
 	}
 	
 	/* We're done */
--- a/waaad/peer.c	Tue Sep 09 17:12:48 2008 +0900
+++ b/waaad/peer.c	Tue Sep 09 17:13:26 2008 +0900
@@ -123,7 +123,7 @@
 	}
 	
 	/* Destroy the expire thread */
-	ret = _peer_cancel_th(&expire_th);
+	ret = _peer_cancel_th(&expire_th, 0);
 	if (ret != 0) {
 		TRACE_DEBUG(INFO, "Unable to cancel the expiry thread: %s", strerror(ret));
 		/* continue */
@@ -480,7 +480,7 @@
 	
 	/* first, stop the peer state machine if it's running -- this will do most of the cleanup */
 	if ( peer->p_state != STATE_DISABLED) {
-		ret = _peer_cancel_th( &peer->p_psm );
+		ret = _peer_cancel_th( &peer->p_psm, 0 );
 		if (ret != 0) {
 			TRACE_DEBUG(INFO, "Failed to cancel the peer state machine thread: %s", strerror(ret));
 			/* continue anyway */
"Welcome to our mercurial repository"