diff freeDiameter/p_out.c @ 34:0e2b57789361

Backup for the WE, some warnings remaining
author Sebastien Decugis <sdecugis@nict.go.jp>
date Fri, 30 Oct 2009 17:23:06 +0900
parents e6fcdf12b9a0
children 6486e97f56ae
line wrap: on
line diff
--- a/freeDiameter/p_out.c	Thu Oct 29 18:05:45 2009 +0900
+++ b/freeDiameter/p_out.c	Fri Oct 30 17:23:06 2009 +0900
@@ -36,29 +36,100 @@
 #include "fD.h"
 
 /* Alloc a new hbh for requests, bufferize the message and send on the connection, save in sentreq if provided */
-static int do_send(struct msg ** msg, struct cnxctx * cnx, uint32_t * hbh, struct fd_list * sentreq)
+static int do_send(struct msg ** msg, struct cnxctx * cnx, uint32_t * hbh, struct sr_list * srl)
 {
-	TRACE_ENTRY("%p %p %p %p", msg, cnx, hbh, sentreq);
+	struct msg_hdr * hdr;
+	int msg_is_a_req;
+	uint8_t * buf;
+	size_t sz;
+	int ret;
+	
+	TRACE_ENTRY("%p %p %p %p", msg, cnx, hbh, srl);
+	
+	/* Retrieve the message header */
+	CHECK_FCT( fd_msg_hdr(*msg, &hdr) );
+	
+	msg_is_a_req = (hdr->msg_flags & CMD_FLAG_REQUEST);
+	if (msg_is_a_req) {
+		CHECK_PARAMS(hbh && srl);
+		/* Alloc the hop-by-hop id and increment the value for next message */
+		hdr->msg_hbhid = *hbh;
+		*hbh = hdr->msg_hbhid + 1;
+	}
+	
+	/* Create the message buffer */
+	CHECK_FCT(fd_msg_bufferize( *msg, &buf, &sz ));
 	
-	TODO("If message is a request");
-		TODO("Alloc new *hbh");
+	/* Send the message */
+	pthread_cleanup_push( free, buf );
+	CHECK_FCT_DO( ret = fd_cnx_send(cnx, buf, sz), { free(buf); return ret; } );
+	pthread_cleanup_pop(1);
 	
-	TODO("Bufferize the message, send it");
+	/* Save a request */
+	if (msg_is_a_req) {
+		CHECK_FCT_DO( fd_p_sr_store(srl, msg, &hdr->msg_hbhid),
+			{
+				fd_log_debug("The following request was sent successfully but not saved locally:\n" );
+				fd_log_debug("	(as a result the matching answer will be discarded)\n" );
+				fd_msg_dump_walk(NONE, *msg);
+			} );
+				
+	}
 	
-	TODO("Save in sentreq or free")
+	/* Free answers and unsaved requests */
+	if (*msg) {
+		CHECK_FCT( fd_msg_free(*msg) );
+		*msg = NULL;
+	}
 	
-	return ENOTSUP;
+	return 0;
+}
+
+static void cleanup_requeue(void * arg)
+{
+	struct msg *msg = arg;
+	CHECK_FCT_DO(fd_fifo_post(fd_g_outgoing, &msg),
+			CHECK_FCT_DO(fd_msg_free(msg), /* What can we do more? */));
 }
 
 /* The code of the "out" thread */
 static void * out_thr(void * arg)
 {
-	TODO("Pick next message in peer->p_tosend");
-	TODO("do_send, log errors");
-	TODO("In case of cancellation, requeue the message");
-	return NULL;
+	struct fd_peer * peer = arg;
+	ASSERT( CHECK_PEER(peer) );
+	
+	/* Set the thread name */
+	{
+		char buf[48];
+		sprintf(buf, "OUT/%.*s", sizeof(buf) - 5, peer->p_hdr.info.pi_diamid);
+		fd_log_threadname ( buf );
+	}
+	
+	/* Loop until cancelation */
+	while (1) {
+		struct msg * msg;
+		
+		/* Retrieve next message to send */
+		CHECK_FCT_DO( fd_fifo_get(peer->p_tosend, &msg), goto error );
+		
+		/* Now if we are cancelled, we requeue this message */
+		pthread_cleanup_push(cleanup_requeue, msg);
+		
+		/* Send the message, log any error */
+		CHECK_FCT_DO( do_send(&msg, peer->p_cnxctx, &peer->p_hbh, &peer->p_sr),
+			{
+				fd_log_debug("An error occurred while sending this message, it is lost:\n");
+				fd_msg_dump_walk(NONE, msg);
+				fd_msg_free(msg);
+			} );
+			
+		/* Loop */
+		pthread_cleanup_pop(0);
+	}
+	
 error:
-	TODO(" Send an event to the peer ");
+	/* It is not really a connection error, but the effect is the same, we are not able to send anymore message */
+	CHECK_FCT_DO( fd_event_send(peer->p_events, FDEVP_CNX_ERROR, 0, NULL), /* What do we do if it fails? */ );
 	return NULL;
 }
 
@@ -83,7 +154,7 @@
 			cnx = peer->p_cnxctx;
 
 		/* Do send the message */
-		CHECK_FCT( do_send(msg, cnx, hbh, peer ? &peer->p_sentreq : NULL) );
+		CHECK_FCT( do_send(msg, cnx, hbh, peer ? &peer->p_sr : NULL) );
 	}
 	
 	return 0;
"Welcome to our mercurial repository"