changeset 1102:1d7b3ebda27f

Implemented the calls to HOOK_DATA_RECEIVED hook
author Sebastien Decugis <sdecugis@freediameter.net>
date Thu, 09 May 2013 16:40:02 +0800
parents 40b48a3997a2
children d8591b1c56cd 6b4a417d2845
files include/freeDiameter/libfdcore.h include/freeDiameter/libfdproto.h libfdcore/cnxctx.c libfdcore/cnxctx.h libfdcore/fdcore-internal.h libfdcore/sctp.c libfdcore/sctps.c
diffstat 7 files changed, 103 insertions(+), 43 deletions(-) [+]
line wrap: on
line diff
--- a/include/freeDiameter/libfdcore.h	Thu May 09 12:06:03 2013 +0800
+++ b/include/freeDiameter/libfdcore.h	Thu May 09 16:40:02 2013 +0800
@@ -915,7 +915,7 @@
 		/* Hook called as soon as a message has been received from the network, after TLS & boundary processing.
 		 - {msg} is NULL.
 		 - {peer} is NULL.
-		 - {other} is a pointer to a structure { size_t len; uint8_t * buf; } containing the received buffer.
+		 - {other} is a pointer to a struct fd_cnx_rcvdata containing the received buffer.
 		 - {permsgdata} points to either a new empty structure allocated for this message (cf. fd_hook_data_hdl), or NULL if no hdl is registered.
 		 */
 		 
@@ -1024,6 +1024,12 @@
 /* A handle that will be associated with the extension, and with the permsgdata structures. */
 struct fd_hook_data_hdl;
 
+/* The following structure is what is passed to the HOOK_DATA_RECEIVED hook */
+struct fd_cnx_rcvdata {
+	size_t  length;
+	uint8_t * buffer; /* internal note: the buffer is padded with a struct fd_msg_pmdl, not accounted for in length */
+};
+
 /* Function to register a new fd_hook_data_hdl. Should be called by your extension init function.
  * The arguments are the functions called to initialize a new fd_hook_permsgdata and to free this structure when the corresponding message is being freed.
  */
--- a/include/freeDiameter/libfdproto.h	Thu May 09 12:06:03 2013 +0800
+++ b/include/freeDiameter/libfdproto.h	Thu May 09 16:40:02 2013 +0800
@@ -2552,9 +2552,9 @@
 	struct fd_list sentinel; /* if the sentinel.o field is NULL, the structure is not initialized. Otherwise it points to the cleanup function in libfdcore. */
 	pthread_mutex_t lock;
 };
-#define FD_MSG_PMDL_INITIALIZER(pmdl_ptr)        { FD_LIST_INITIALIZER(  (pmdl_ptr)->sentinel       ), PTHREAD_MUTEX_INITIALIZER }
 struct fd_msg_pmdl * fd_msg_pmdl_get(struct msg * msg);
 
+
 /***************************************/
 /*   Manage AVP values                 */
 /***************************************/
--- a/libfdcore/cnxctx.c	Thu May 09 12:06:03 2013 +0800
+++ b/libfdcore/cnxctx.c	Thu May 09 16:40:02 2013 +0800
@@ -672,6 +672,53 @@
 	return ret;
 }
 
+#define ALIGNOF(t) ((char *)(&((struct { char c; t _h; } *)0)->_h) - (char *)0)  /* Could use __alignof__(t) on some systems but this is more portable probably */
+#define PMDL_PADDED(len) ( ((len) + ALIGNOF(struct fd_msg_pmdl) - 1) & ~(ALIGNOF(struct fd_msg_pmdl) - 1) )
+
+size_t fd_msg_pmdl_sizewithoverhead(size_t datalen)
+{
+	return PMDL_PADDED(datalen);
+}
+
+struct fd_msg_pmdl * fd_msg_pmdl_get_inbuf(uint8_t * buf, size_t datalen)
+{
+	return (struct fd_msg_pmdl *)(buf + PMDL_PADDED(datalen));
+} 
+
+static int fd_cnx_init_msg_buffer(uint8_t * buffer, size_t expected_len, struct fd_msg_pmdl ** pmdl)
+{
+	*pmdl = fd_msg_pmdl_get_inbuf(buffer, expected_len);
+	fd_list_init(&(*pmdl)->sentinel, NULL);
+	CHECK_POSIX(pthread_mutex_init(&(*pmdl)->lock, NULL) );
+	return 0;
+}
+
+static uint8_t * fd_cnx_alloc_msg_buffer(size_t expected_len, struct fd_msg_pmdl ** pmdl)
+{
+	uint8_t * ret = NULL;
+	
+	CHECK_MALLOC_DO(  ret = malloc( PMDL_PADDED(expected_len) ), return NULL );
+	CHECK_FCT_DO( fd_cnx_init_msg_buffer(ret, expected_len, pmdl), {free(ret); return NULL;} );
+	return ret;
+}
+
+static uint8_t * fd_cnx_realloc_msg_buffer(uint8_t * buffer, size_t expected_len, struct fd_msg_pmdl ** pmdl)
+{
+	uint8_t * ret = NULL;
+	
+	CHECK_MALLOC_DO(  ret = realloc( buffer, PMDL_PADDED(expected_len) ), return NULL );
+	CHECK_FCT_DO( fd_cnx_init_msg_buffer(ret, expected_len, pmdl), {free(ret); return NULL;} );
+	return ret;
+}
+
+static void free_rcvdata(void * arg) 
+{
+	struct fd_cnx_rcvdata * data = arg;
+	struct fd_msg_pmdl * pmdl = fd_msg_pmdl_get_inbuf(data->buffer, data->length);
+	(void) pthread_mutex_destroy(&pmdl->lock);
+	free(data->buffer);
+}
+
 /* Receiver thread (TCP & noTLS) : incoming message is directly saved into the target queue */
 static void * rcvthr_notls_tcp(void * arg)
 {
@@ -694,8 +741,8 @@
 	/* Receive from a TCP connection: we have to rebuild the message boundaries */
 	do {
 		uint8_t header[4];
-		uint8_t * newmsg;
-		size_t  length;
+		struct fd_cnx_rcvdata rcv_data;
+		struct fd_msg_pmdl *pmdl=NULL;
 		ssize_t ret = 0;
 		size_t	received = 0;
 
@@ -708,37 +755,42 @@
 			received += ret;
 		} while (received < sizeof(header));
 
-		length = ((size_t)header[1] << 16) + ((size_t)header[2] << 8) + (size_t)header[3];
+		rcv_data.length = ((size_t)header[1] << 16) + ((size_t)header[2] << 8) + (size_t)header[3];
 
 		/* Check the received word is a valid begining of a Diameter message */
 		if ((header[0] != DIAMETER_VERSION)	/* defined in <libfdproto.h> */
-		   || (length > DIAMETER_MSG_SIZE_MAX)) { /* to avoid too big mallocs */
+		   || (rcv_data.length > DIAMETER_MSG_SIZE_MAX)) { /* to avoid too big mallocs */
 			/* The message is suspect */
-			TRACE_DEBUG(INFO, "Received suspect header [ver: %d, size: %zd], assume disconnection", (int)header[0], length);
+			LOG_E( "Received suspect header [ver: %d, size: %zd], assuming disconnection", (int)header[0], rcv_data.length);
 			fd_cnx_markerror(conn);
 			goto out; /* Stop the thread, the recipient of the event will cleanup */
 		}
 
 		/* Ok, now we can really receive the data */
-		CHECK_MALLOC_DO(  newmsg = malloc( length + sizeof(struct timespec) ), goto fatal );
-		memcpy(newmsg, header, sizeof(header));
+		CHECK_MALLOC_DO(  rcv_data.buffer = fd_cnx_alloc_msg_buffer( rcv_data.length, &pmdl ), goto fatal );
+		memcpy(rcv_data.buffer, header, sizeof(header));
 
-		while (received < length) {
-			pthread_cleanup_push(free, newmsg); /* In case we are canceled, clean the partialy built buffer */
-			ret = fd_cnx_s_recv(conn, newmsg + received, length - received);
+		while (received < rcv_data.length) {
+			pthread_cleanup_push(free_rcvdata, &rcv_data); /* In case we are canceled, clean the partialy built buffer */
+			ret = fd_cnx_s_recv(conn, rcv_data.buffer + received, rcv_data.length - received);
 			pthread_cleanup_pop(0);
 
 			if (ret <= 0) {
-				free(newmsg);
+				free_rcvdata(&rcv_data);
 				goto out;
 			}
 			received += ret;
 		}
 		
-		// fd_msg_log(....)
+		fd_hook_call(HOOK_DATA_RECEIVED, NULL, NULL, &rcv_data, pmdl);
 		
 		/* We have received a complete message, pass it to the daemon */
-		CHECK_FCT_DO( fd_event_send( fd_cnx_target_queue(conn), FDEVP_CNX_MSG_RECV, length, newmsg), /* continue or destroy everything? */);
+		CHECK_FCT_DO( fd_event_send( fd_cnx_target_queue(conn), FDEVP_CNX_MSG_RECV, rcv_data.length, rcv_data.buffer), 
+			{ 
+				free_rcvdata(&rcv_data);
+				CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
+				return NULL; 
+			} );
 		
 	} while (conn->cc_loop);
 	
@@ -757,8 +809,7 @@
 static void * rcvthr_notls_sctp(void * arg)
 {
 	struct cnxctx * conn = arg;
-	uint8_t * buf;
-	size_t    bufsz;
+	struct fd_cnx_rcvdata rcv_data;
 	int	  event;
 	
 	TRACE_ENTRY("%p", arg);
@@ -776,7 +827,8 @@
 	ASSERT( fd_cnx_target_queue(conn) );
 	
 	do {
-		CHECK_FCT_DO( fd_sctp_recvmeta(conn, NULL, &buf, &bufsz, &event), goto fatal );
+		struct fd_msg_pmdl *pmdl=NULL;
+		CHECK_FCT_DO( fd_sctp_recvmeta(conn, NULL, &rcv_data.buffer, &rcv_data.length, &event), goto fatal );
 		if (event == FDEVP_CNX_ERROR) {
 			fd_cnx_markerror(conn);
 			goto out;
@@ -786,8 +838,12 @@
 			/* Just ignore the notification for now, we will get another error later anyway */
 			continue;
 		}
-		/* Note: the real size of buf is bufsz + struct timespec */
-		CHECK_FCT_DO( fd_event_send( fd_cnx_target_queue(conn), event, bufsz, buf), goto fatal );
+
+		if (event == FDEVP_CNX_MSG_RECV) {
+			CHECK_MALLOC_DO( rcv_data.buffer = fd_cnx_realloc_msg_buffer(rcv_data.buffer, rcv_data.length, &pmdl), goto fatal );
+			fd_hook_call(HOOK_DATA_RECEIVED, NULL, NULL, &rcv_data, pmdl);
+		}
+		CHECK_FCT_DO( fd_event_send( fd_cnx_target_queue(conn), event, rcv_data.length, rcv_data.buffer), goto fatal );
 		
 	} while (conn->cc_loop || (event != FDEVP_CNX_MSG_RECV));
 	
@@ -927,8 +983,8 @@
 	/* No guarantee that GnuTLS preserves the message boundaries, so we re-build it as in TCP */
 	do {
 		uint8_t header[4];
-		uint8_t * newmsg;
-		size_t  length;
+		struct fd_cnx_rcvdata rcv_data;
+		struct fd_msg_pmdl *pmdl=NULL;
 		ssize_t ret = 0;
 		size_t	received = 0;
 
@@ -941,37 +997,39 @@
 			received += ret;
 		} while (received < sizeof(header));
 
-		length = ((size_t)header[1] << 16) + ((size_t)header[2] << 8) + (size_t)header[3];
+		rcv_data.length = ((size_t)header[1] << 16) + ((size_t)header[2] << 8) + (size_t)header[3];
 
 		/* Check the received word is a valid beginning of a Diameter message */
 		if ((header[0] != DIAMETER_VERSION)	/* defined in <libfreeDiameter.h> */
-		   || (length > DIAMETER_MSG_SIZE_MAX)) { /* to avoid too big mallocs */
+		   || (rcv_data.length > DIAMETER_MSG_SIZE_MAX)) { /* to avoid too big mallocs */
 			/* The message is suspect */
-			TRACE_DEBUG(INFO, "Received suspect header [ver: %d, size: %zd], assume disconnection", (int)header[0], length);
+			LOG_E( "Received suspect header [ver: %d, size: %zd], assume disconnection", (int)header[0], rcv_data.length);
 			fd_cnx_markerror(conn);
 			goto out;
 		}
 
 		/* Ok, now we can really receive the data */
-		CHECK_MALLOC(  newmsg = malloc( length + sizeof(struct timespec)) );
-		memcpy(newmsg, header, sizeof(header));
+		CHECK_MALLOC(  rcv_data.buffer = fd_cnx_alloc_msg_buffer( rcv_data.length, &pmdl ) );
+		memcpy(rcv_data.buffer, header, sizeof(header));
 
-		while (received < length) {
-			pthread_cleanup_push(free, newmsg); /* In case we are canceled, clean the partialy built buffer */
-			ret = fd_tls_recv_handle_error(conn, session, newmsg + received, length - received);
+		while (received < rcv_data.length) {
+			pthread_cleanup_push(free_rcvdata, &rcv_data); /* In case we are canceled, clean the partialy built buffer */
+			ret = fd_tls_recv_handle_error(conn, session, rcv_data.buffer + received, rcv_data.length - received);
 			pthread_cleanup_pop(0);
 
 			if (ret <= 0) {
-				free(newmsg);
+				free_rcvdata(&rcv_data);
 				goto out;
 			}
 			received += ret;
 		}
 		
+		fd_hook_call(HOOK_DATA_RECEIVED, NULL, NULL, &rcv_data, pmdl);
+		
 		/* We have received a complete message, pass it to the daemon */
-		CHECK_FCT_DO( ret = fd_event_send( fd_cnx_target_queue(conn), FDEVP_CNX_MSG_RECV, length, newmsg), 
+		CHECK_FCT_DO( ret = fd_event_send( fd_cnx_target_queue(conn), FDEVP_CNX_MSG_RECV, rcv_data.length, rcv_data.buffer), 
 			{ 
-				free(newmsg); 
+				free_rcvdata(&rcv_data);
 				CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), );
 				return ret; 
 			} );
--- a/libfdcore/cnxctx.h	Thu May 09 12:06:03 2013 +0800
+++ b/libfdcore/cnxctx.h	Thu May 09 16:40:02 2013 +0800
@@ -65,7 +65,6 @@
 		DiamId_t 			 cn;		/* If not NULL, remote certif will be checked to match this Common Name */
 		int				 mode; 		/* GNUTLS_CLIENT / GNUTLS_SERVER */
 		gnutls_session_t 		 session;	/* Session object (stream #0 in case of SCTP) */
-		struct timespec  		 recvon;	/* Timestamp of the last chunk of data received on this session -- before uncipher */
 	}		cc_tls_para;
 
 	/* If cc_proto == SCTP */
@@ -130,7 +129,6 @@
 		size_t   bufsz;
 		size_t   offset;
 	} 		 partial;	/* If the pull function did not read the full content of first message in raw, it stores it here for next read call. */
-	struct timespec  recvon;	/* Timestamp of the last chunk of data received on this stream -- before uncipher */
 	pthread_t	 thr;		/* Thread to decrypt raw data in this pair of streams */
 	gnutls_session_t session;	/* TLS context using this pair of streams -- except if strid == 0, in that case session is outside the array */
 };
--- a/libfdcore/fdcore-internal.h	Thu May 09 12:06:03 2013 +0800
+++ b/libfdcore/fdcore-internal.h	Thu May 09 16:40:02 2013 +0800
@@ -218,7 +218,7 @@
 	/* request to terminate this peer : disconnect, requeue all messages */
 	FDEVP_TERMINATE = 1500
 	
-	/* A connection object has received a message. (data contains the buffer + struct timespec piggytailed -- unaligned) */
+	/* A connection object has received a message. (data contains the buffer + padding + struct fd_msg_pmdl) */
 	,FDEVP_CNX_MSG_RECV
 			 
 	/* A connection object has encountered an error (disconnected). */
@@ -271,7 +271,6 @@
 	int  		  validate;	/* The peer is new, it must be validated (by an extension) or error CEA to be sent */
 };
 
-
 /* Functions */
 int  fd_peer_fini();
 int  fd_peer_alloc(struct fd_peer ** ptr);
@@ -366,4 +365,7 @@
 void   fd_hook_call(enum fd_hook_type type, struct msg * msg, struct fd_peer * peer, void * other, struct fd_msg_pmdl * pmdl);
 void   fd_hook_associate(struct msg * msg, struct fd_msg_pmdl * pmdl);
 int    fd_hooks_init(void);
+size_t fd_msg_pmdl_sizewithoverhead(size_t datalen);
+struct fd_msg_pmdl * fd_msg_pmdl_get_inbuf(uint8_t * buf, size_t datalen); 
+
 #endif /* _FDCORE_INTERNAL_H */
--- a/libfdcore/sctp.c	Thu May 09 12:06:03 2013 +0800
+++ b/libfdcore/sctp.c	Thu May 09 16:40:02 2013 +0800
@@ -1085,7 +1085,7 @@
 	
 	/* We will loop while all data is not received. */
 incomplete:
-	while (datasize + sizeof(struct timespec) >= bufsz ) {
+	while (datasize >= bufsz ) {
 		/* The buffer is full, enlarge it */
 		bufsz += mempagesz;
 		CHECK_MALLOC( data = realloc(data, bufsz ) );
@@ -1093,7 +1093,7 @@
 	/* the new data will be received following the preceding */
 	memset(&iov,  0, sizeof(iov));
 	iov.iov_base = data + datasize ;
-	iov.iov_len  = bufsz - sizeof(struct timespec) - datasize;
+	iov.iov_len  = bufsz - datasize;
 
 	/* Receive data from the socket */
 again:
--- a/libfdcore/sctps.c	Thu May 09 12:06:03 2013 +0800
+++ b/libfdcore/sctps.c	Thu May 09 16:40:02 2013 +0800
@@ -91,7 +91,6 @@
 			case FDEVP_CNX_MSG_RECV:
 				/* Demux this message to the appropriate fifo, another thread will pull, gnutls process, and send to target queue */
 				if (strid < conn->cc_sctp_para.pairs) {
-					/* Note, here the timespec is piggytailed to buf */
 					CHECK_FCT_DO(fd_event_send(conn->cc_sctps_data.array[strid].raw_recv, event, bufsz, buf), goto fatal );
 				} else {
 					TRACE_DEBUG(INFO, "Received packet (%zd bytes) on out-of-range stream #%d from %s, discarded.", bufsz, strid, conn->cc_remid);
@@ -195,9 +194,6 @@
 			/* Documentations says to return 0 on connection closed, but it does hang within gnutls_handshake */
 			return -1;
 		}
-		if (ev == FDEVP_CNX_MSG_RECV) {
-			memcpy(&ctx->recvon, ctx->partial.buf + ctx->partial.bufsz, sizeof(struct timespec)); /* retrieve piggy-tailed ts */
-		}
 	}
 		
 	pulled = ctx->partial.bufsz - ctx->partial.offset;
"Welcome to our mercurial repository"