changeset 1377:ce257e43085d

fd_fifo_del: check if queue is already (being) destroyed and return success in that case. Improves shutdown behaviour.
author Thomas Klausner <tk@giga.or.at>
date Thu, 20 Jun 2019 12:01:50 +0200
parents b8afaf63f65a
children 86e231b3d6fc
files libfdproto/fifo.c
diffstat 1 files changed, 137 insertions(+), 132 deletions(-) [+]
line wrap: on
line diff
--- a/libfdproto/fifo.c	Thu Jun 20 11:33:29 2019 +0200
+++ b/libfdproto/fifo.c	Thu Jun 20 12:01:50 2019 +0200
@@ -50,18 +50,18 @@
 /* Definition of a FIFO queue object */
 struct fifo {
 	int		eyec;	/* An eye catcher, also used to check a queue is valid. FIFO_EYEC */
-	
+
 	pthread_mutex_t	mtx;	/* Mutex protecting this queue */
 	pthread_cond_t	cond_pull;	/* condition variable for pulling threads */
 	pthread_cond_t	cond_push;	/* condition variable for pushing threads */
-	
+
 	struct fd_list	list;	/* sentinel for the list of elements */
 	int		count;	/* number of objects in the list */
 	int		thrs;	/* number of threads waiting for a new element (when count is 0) */
-	
+
 	int 		max;	/* maximum number of items to accept if not 0 */
 	int		thrs_push; /* number of threads waitnig to push an item */
-	
+
 	uint16_t	high;	/* High level threshold (see libfreeDiameter.h for details) */
 	uint16_t	low;	/* Low level threshhold */
 	void 		*data;	/* Opaque pointer for threshold callbacks */
@@ -69,12 +69,12 @@
 	void		(*l_cb)(struct fifo *, void **);
 	int 		highest;/* The highest count value for which h_cb has been called */
 	int		highest_ever; /* The max count value this queue has reached (for tweaking) */
-	
+
 	long long	total_items;   /* Cumulated number of items that went through this fifo (excluding current count), always increasing. */
 	struct timespec total_time;    /* Cumulated time all items spent in this queue, including blocking time (always growing, use deltas for monitoring) */
 	struct timespec blocking_time; /* Cumulated time threads trying to post new items were blocked (queue full). */
 	struct timespec last_time;     /* For the last element retrieved from the queue, how long it take between posting (including blocking) and poping */
-	
+
 };
 
 struct fifo_item {
@@ -93,25 +93,25 @@
 int fd_fifo_new ( struct fifo ** queue, int max )
 {
 	struct fifo * new;
-	
+
 	TRACE_ENTRY( "%p", queue );
-	
+
 	CHECK_PARAMS( queue );
-	
+
 	/* Create a new object */
 	CHECK_MALLOC( new = malloc (sizeof (struct fifo) )  );
-	
+
 	/* Initialize the content */
 	memset(new, 0, sizeof(struct fifo));
-	
+
 	new->eyec = FIFO_EYEC;
 	CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) );
 	CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) );
 	CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) );
 	new->max = max;
-	
+
 	fd_list_init(&new->list, NULL);
-	
+
 	/* We're done */
 	*queue = new;
 	return 0;
@@ -121,92 +121,97 @@
 DECLARE_FD_DUMP_PROTOTYPE(fd_fifo_dump, char * name, struct fifo * queue, fd_fifo_dump_item_cb dump_item)
 {
 	FD_DUMP_HANDLE_OFFSET();
-	
+
 	if (name) {
-		CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL);	
+		CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL);
 	} else {
 		CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "{fifo}(@%p): ", queue), return NULL);
 	}
-	
+
 	if (!CHECK_FIFO( queue )) {
 		return fd_dump_extend(FD_DUMP_STD_PARAMS, "INVALID/NULL");
 	}
-	
+
 	CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), /* continue */  );
-	CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p", 
+	CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p",
 						queue->count, queue->highest_ever, queue->max,
 						queue->thrs, queue->thrs_push,
 						queue->total_items,(long)queue->total_time.tv_sec,(long)(queue->total_time.tv_nsec/1000),(long)queue->blocking_time.tv_sec,(long)(queue->blocking_time.tv_nsec/1000),(long)queue->last_time.tv_sec,(long)(queue->last_time.tv_nsec/1000),
-						queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data), 
+						queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data),
 			 goto error);
-	
+
 	if (dump_item) {
 		struct fd_list * li;
 		int i = 0;
 		for (li = queue->list.next; li != &queue->list; li = li->next) {
 			struct fifo_item * fi = (struct fifo_item *)li;
-			CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ", 
-						i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)), 
+			CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ",
+						i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)),
 					 goto error);
 			CHECK_MALLOC_DO( (*dump_item)(FD_DUMP_STD_PARAMS, fi->item.o), goto error);
 		}
 	}
 	CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), /* continue */  );
-	
+
 	return *buf;
 error:
 	CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), /* continue */  );
 	return NULL;
 }
 
-/* Delete a queue. It must be empty. */ 
+/* Delete a queue. It must be empty. */
 int fd_fifo_del ( struct fifo  ** queue )
 {
 	struct fifo * q;
 	int loops = 0;
-	
+
 	TRACE_ENTRY( "%p", queue );
 
+	if (queue && *queue == NULL) {
+		/* Queue already (in the process of being) deleted */
+		return 0;
+	}
+
 	CHECK_PARAMS( queue && CHECK_FIFO( *queue ) );
-	
+
 	q = *queue;
-	
+
 	CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
-	
+
 	if ((q->count != 0) || (q->data != NULL)) {
 		TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data);
 		CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ), /* no fallback */  );
 		return EINVAL;
 	}
-	
+
 	/* Ok, now invalidate the queue */
 	q->eyec = 0xdead;
-	
+
 	/* Have all waiting threads return an error */
 	while (q->thrs) {
 		CHECK_POSIX(  pthread_mutex_unlock( &q->mtx ));
 		CHECK_POSIX(  pthread_cond_signal(&q->cond_pull)  );
 		usleep(1000);
-		
+
 		CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
 		ASSERT( ++loops < 20 ); /* detect infinite loops */
 	}
-	
+
 	/* sanity check */
 	ASSERT(FD_IS_LIST_EMPTY(&q->list));
-	
+
 	/* And destroy it */
 	CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )  );
-	
+
 	CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_pull ),  );
-	
+
 	CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_push ),  );
-	
+
 	CHECK_POSIX_DO(  pthread_mutex_destroy( &q->mtx ),  );
-	
+
 	free(q);
 	*queue = NULL;
-	
+
 	return 0;
 }
 
@@ -214,110 +219,110 @@
 int fd_fifo_move ( struct fifo * old, struct fifo * new, struct fifo ** loc_update )
 {
 	int loops = 0;
-	
+
 	TRACE_ENTRY("%p %p %p", old, new, loc_update);
 	CHECK_PARAMS( CHECK_FIFO( old ) && CHECK_FIFO( new ));
-	
+
 	CHECK_PARAMS( ! old->data );
 	if (new->high) {
 		TODO("Implement support for thresholds in fd_fifo_move...");
 	}
-	
+
 	/* Update loc_update */
 	if (loc_update)
 		*loc_update = new;
-	
+
 	/* Lock the queues */
 	CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
-	
+
 	CHECK_PARAMS_DO( (! old->thrs_push), {
 			pthread_mutex_unlock( &old->mtx );
 			return EINVAL;
 		} );
-	
+
 	CHECK_POSIX(  pthread_mutex_lock( &new->mtx )  );
-	
+
 	/* Any waiting thread on the old queue returns an error */
 	old->eyec = 0xdead;
 	while (old->thrs) {
 		CHECK_POSIX(  pthread_mutex_unlock( &old->mtx ));
 		CHECK_POSIX(  pthread_cond_signal( &old->cond_pull )  );
 		usleep(1000);
-		
+
 		CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
 		ASSERT( loops < 20 ); /* detect infinite loops */
 	}
-	
+
 	/* Move all data from old to new */
 	fd_list_move_end( &new->list, &old->list );
 	if (old->count && (!new->count)) {
 		CHECK_POSIX(  pthread_cond_signal(&new->cond_pull)  );
 	}
 	new->count += old->count;
-	
+
 	/* Reset old */
 	old->count = 0;
 	old->eyec = FIFO_EYEC;
-	
+
 	/* Merge the stats in the new queue */
 	new->total_items += old->total_items;
 	old->total_items = 0;
-	
+
 	new->total_time.tv_nsec += old->total_time.tv_nsec;
 	new->total_time.tv_sec += old->total_time.tv_sec + (new->total_time.tv_nsec / 1000000000);
 	new->total_time.tv_nsec %= 1000000000;
 	old->total_time.tv_nsec = 0;
 	old->total_time.tv_sec = 0;
-	
+
 	new->blocking_time.tv_nsec += old->blocking_time.tv_nsec;
 	new->blocking_time.tv_sec += old->blocking_time.tv_sec + (new->blocking_time.tv_nsec / 1000000000);
 	new->blocking_time.tv_nsec %= 1000000000;
 	old->blocking_time.tv_nsec = 0;
 	old->blocking_time.tv_sec = 0;
-	
+
 	/* Unlock, we're done */
 	CHECK_POSIX(  pthread_mutex_unlock( &new->mtx )  );
 	CHECK_POSIX(  pthread_mutex_unlock( &old->mtx )  );
-	
+
 	return 0;
 }
 
 /* Get the information on the queue */
-int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count, 
+int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count,
 				           struct timespec * total, struct timespec * blocking, struct timespec * last)
 {
 	TRACE_ENTRY( "%p %p %p %p %p %p %p %p", queue, current_count, limit_count, highest_count, total_count, total, blocking, last);
-	
+
 	/* Check the parameters */
 	CHECK_PARAMS( CHECK_FIFO( queue ) );
-	
+
 	/* lock the queue */
 	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
-	
+
 	if (current_count)
 		*current_count = queue->count;
-	
+
 	if (limit_count)
 		*limit_count = queue->max;
-	
+
 	if (highest_count)
 		*highest_count = queue->highest_ever;
-	
+
 	if (total_count)
 		*total_count = queue->total_items;
-	
+
 	if (total)
 		memcpy(total, &queue->total_time, sizeof(struct timespec));
-	
+
 	if (blocking)
 		memcpy(blocking, &queue->blocking_time, sizeof(struct timespec));
-	
+
 	if (last)
 		memcpy(last, &queue->last_time, sizeof(struct timespec));
-	
+
 	/* Unlock */
 	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
-	
+
 	/* Done */
 	return 0;
 }
@@ -328,7 +333,7 @@
 {
 	if ( !CHECK_FIFO( queue ) )
 		return 0;
-	
+
 	return queue->count; /* Let's hope it's read atomically, since we are not locking... */
 }
 
@@ -336,23 +341,23 @@
 int fd_fifo_setthrhd ( struct fifo * queue, void * data, uint16_t high, void (*h_cb)(struct fifo *, void **), uint16_t low, void (*l_cb)(struct fifo *, void **) )
 {
 	TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb );
-	
+
 	/* Check the parameters */
 	CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) );
-	
+
 	/* lock the queue */
 	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
-	
+
 	/* Save the values */
 	queue->high = high;
 	queue->low  = low;
 	queue->data = data;
 	queue->h_cb = h_cb;
 	queue->l_cb = l_cb;
-	
+
 	/* Unlock */
 	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
-	
+
 	/* Done */
 	return 0;
 }
@@ -363,13 +368,13 @@
 {
 	struct fifo * q = (struct fifo *)queue;
 	TRACE_ENTRY( "%p", queue );
-	
+
 	/* The thread has been cancelled, therefore it does not wait on the queue anymore */
 	q->thrs_push--;
-	
+
 	/* Now unlock the queue, and we're done */
 	CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  );
-	
+
 	/* End of cleanup handler */
 	return;
 }
@@ -381,37 +386,37 @@
 	struct fifo_item * new;
 	int call_cb = 0;
 	struct timespec posted_on, queued_on;
-	
+
 	/* Get the timing of this call */
 	CHECK_SYS(  clock_gettime(CLOCK_REALTIME, &posted_on)  );
-	
+
 	/* lock the queue */
 	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
-	
+
 	if ((!skip_max) && (queue->max)) {
 		while (queue->count >= queue->max) {
 			int ret = 0;
-			
+
 			/* We have to wait for an item to be pulled */
 			queue->thrs_push++ ;
 			pthread_cleanup_push( fifo_cleanup_push, queue);
 			ret = pthread_cond_wait( &queue->cond_push, &queue->mtx );
 			pthread_cleanup_pop(0);
 			queue->thrs_push-- ;
-			
+
 			ASSERT( ret == 0 );
 		}
 	}
-	
+
 	/* Create a new list item */
 	CHECK_MALLOC_DO(  new = malloc (sizeof (struct fifo_item)) , {
 			pthread_mutex_unlock( &queue->mtx );
 			return ENOMEM;
 		} );
-	
+
 	fd_list_init(&new->item, *item);
 	*item = NULL;
-	
+
 	/* Add the new item at the end */
 	fd_list_insert_before( &queue->list, &new->item);
 	queue->count++;
@@ -421,10 +426,10 @@
 		call_cb = 1;
 		queue->highest = queue->count;
 	}
-	
+
 	/* store timing */
 	memcpy(&new->posted_on, &posted_on, sizeof(struct timespec));
-	
+
 	/* update queue timing info "blocking time" */
 	{
 		long long blocked_ns;
@@ -435,7 +440,7 @@
 		queue->blocking_time.tv_sec += blocked_ns / 1000000000;
 		queue->blocking_time.tv_nsec = blocked_ns % 1000000000;
 	}
-	
+
 	/* Signal if threads are asleep */
 	if (queue->thrs > 0) {
 		CHECK_POSIX(  pthread_cond_signal(&queue->cond_pull)  );
@@ -444,14 +449,14 @@
 		/* cascade */
 		CHECK_POSIX(  pthread_cond_signal(&queue->cond_push)  );
 	}
-	
+
 	/* Unlock */
 	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
-	
+
 	/* Call high-watermark cb as needed */
 	if (call_cb && queue->h_cb)
 		(*queue->h_cb)(queue, &queue->data);
-	
+
 	/* Done */
 	return 0;
 }
@@ -460,24 +465,24 @@
 int fd_fifo_post_int ( struct fifo * queue, void ** item )
 {
 	TRACE_ENTRY( "%p %p", queue, item );
-	
+
 	/* Check the parameters */
 	CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
-	
+
 	return fd_fifo_post_internal ( queue,item, 0 );
-	
+
 }
 
 /* Post a new item in the queue, not blocking */
 int fd_fifo_post_noblock ( struct fifo * queue, void ** item )
 {
 	TRACE_ENTRY( "%p %p", queue, item );
-	
+
 	/* Check the parameters */
 	CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );
-	
+
 	return fd_fifo_post_internal ( queue,item, 1 );
-	
+
 }
 
 /* Pop the first item from the queue */
@@ -486,35 +491,35 @@
 	void * ret = NULL;
 	struct fifo_item * fi;
 	struct timespec now;
-	
+
 	ASSERT( ! FD_IS_LIST_EMPTY(&queue->list) );
-	
+
 	fi = (struct fifo_item *)(queue->list.next);
 	ret = fi->item.o;
 	fd_list_unlink(&fi->item);
 	queue->count--;
 	queue->total_items++;
-	
+
 	/* Update the timings */
 	CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &now), goto skip_timing  );
 	{
 		long long elapsed = (now.tv_sec - fi->posted_on.tv_sec) * 1000000000;
 		elapsed += now.tv_nsec - fi->posted_on.tv_nsec;
-		
+
 		queue->last_time.tv_sec = elapsed / 1000000000;
 		queue->last_time.tv_nsec = elapsed % 1000000000;
-		
+
 		elapsed += queue->total_time.tv_nsec;
 		queue->total_time.tv_sec += elapsed / 1000000000;
 		queue->total_time.tv_nsec = elapsed % 1000000000;
 	}
-skip_timing:	
+skip_timing:
 	free(fi);
-	
+
 	if (queue->thrs_push) {
 		CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), );
 	}
-	
+
 	return ret;
 }
 
@@ -523,12 +528,12 @@
 {
 	if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0))
 		return 0;
-	
+
 	if (((queue->count % queue->high) == queue->low) && (queue->highest > queue->count)) {
 		queue->highest -= queue->high;
 		return 1;
 	}
-	
+
 	return 0;
 }
 
@@ -537,15 +542,15 @@
 {
 	int wouldblock = 0;
 	int call_cb = 0;
-	
+
 	TRACE_ENTRY( "%p %p", queue, item );
-	
+
 	/* Check the parameters */
 	CHECK_PARAMS( CHECK_FIFO( queue ) && item );
-	
+
 	/* lock the queue */
 	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
-	
+
 	/* Check queue status */
 	if (queue->count > 0) {
 got_item:
@@ -562,18 +567,18 @@
 			if (queue->count > 0)
 				goto got_item;
 		}
-		
+
 		wouldblock = 1;
 		*item = NULL;
 	}
-		
+
 	/* Unlock */
 	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
-	
+
 	/* Call low watermark callback as needed */
 	if (call_cb)
 		(*queue->l_cb)(queue, &queue->data);
-	
+
 	/* Done */
 	return wouldblock ? EWOULDBLOCK : 0;
 }
@@ -583,13 +588,13 @@
 {
 	struct fifo * q = (struct fifo *)queue;
 	TRACE_ENTRY( "%p", queue );
-	
+
 	/* The thread has been cancelled, therefore it does not wait on the queue anymore */
 	q->thrs--;
-	
+
 	/* Now unlock the queue, and we're done */
 	CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  );
-	
+
 	/* End of cleanup handler */
 	return;
 }
@@ -599,16 +604,16 @@
 {
 	int call_cb = 0;
 	int ret = 0;
-	
+
 	/* Check the parameters */
 	CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) );
-	
+
 	/* Initialize the return value */
 	*item = NULL;
-	
+
 	/* lock the queue */
 	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
-	
+
 awaken:
 	/* Check queue status */
 	if (!CHECK_FIFO( queue )) {
@@ -617,7 +622,7 @@
 		TRACE_DEBUG(FULL, "The queue is being destroyed -> EPIPE");
 		return EPIPE;
 	}
-	
+
 	if (queue->count > 0) {
 		/* There are items in the queue, so pick the first one */
 		*item = mq_pop(queue);
@@ -635,17 +640,17 @@
 		queue->thrs-- ;
 		if (ret == 0)
 			goto awaken;  /* test for spurious wake-ups */
-		
+
 		/* otherwise (ETIMEDOUT / other error) just continue */
 	}
-	
+
 	/* Unlock */
 	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
-	
+
 	/* Call low watermark callback as needed */
 	if (call_cb)
 		(*queue->l_cb)(queue, &queue->data);
-	
+
 	/* Done */
 	return ret;
 }
@@ -669,13 +674,13 @@
 {
 	int ret = 0;
 	TRACE_ENTRY( "%p %p", queue, abstime );
-	
+
 	CHECK_PARAMS_DO( CHECK_FIFO( queue ), return -EINVAL );
-	
+
 	/* lock the queue */
 	CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), return -__ret__  );
-	
-awaken:	
+
+awaken:
 	ret = (queue->count > 0 ) ? queue->count : 0;
 	if ((ret == 0) && (abstime != NULL)) {
 		/* We have to wait for a new item */
@@ -686,15 +691,15 @@
 		queue->thrs-- ;
 		if (ret == 0)
 			goto awaken;  /* test for spurious wake-ups */
-		
+
 		if (ret == ETIMEDOUT)
 			ret = 0;
-		else 
+		else
 			ret = -ret;
 	}
-	
+
 	/* Unlock */
 	CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), return -__ret__  );
-	
+
 	return ret;
 }
"Welcome to our mercurial repository"