view libfdproto/fifo.c @ 1481:c9e9f8a71946

Update to 3GPP TS 32.299 V15.7.0 (2019-06) Add AVPs: - 3GPP-OC-Rating-Group, Unsigned32, code 1321 - 3GPP-OC-Request-Type, Enumerated, code 1322 - 3GPP-OC-Specific-Reduction, Grouped, code 1320 - 3GPP-PS-Data-Off-Status-32.299, Enumerated, code 4406 - API-Content, UTF8String, code 1309 - API-Direction, Enumerated, code 1310 - API-Identifier, OctetString, code 1311 - API-Invocation-Timestamp, Time, code 1312 - API-Network-Service-Node, Enumerated, code 1315 - API-Result-Code, Unsigned32, code 1313 - API-Size, Unsigned64, code 1314 - APN-Rate-Control, Grouped, code 3933 - APN-Rate-Control-Downlink, Grouped, code 3934 - APN-Rate-Control-Uplink, Grouped, code 3935 - Access-Network-Info-Change, Grouped, code 4401 - Additional-Exception-Reports, Enumerated, code 3936 - Announcement-Identifier, Unsigned32, code 3905 - Announcement-Information, Grouped, code 3904 - Announcement-Order, Unsigned32, code 3906 - Announcing-PLMN-ID, UTF8String, code 4408 - Announcing-UE-HPLMN-Identifier, UTF8String, code 3426 - Announcing-UE-VPLMN-Identifier, UTF8String, code 3427 - Application-Specific-Data, OctetString, code 3458 - Authorised-QoS, UTF8String, code 849 - BSSID, UTF8String, code 2716 - Basic-Service-Code, Grouped, code 3411 - Bearer-Capability, OctetString, code 3412 - CN-Operator-Selection-Entity, Enumerated, code 3421 - CP-CIoT-EPS-Optimisation-Indicator, Enumerated, code 3930 - CPDT-Information, Grouped, code 3927 - Called-Identity, UTF8String, code 3916 - Called-Identity-Change, Grouped, code 3917 - Cellular-Network-Information, OctetString, code 3924 - Charging-Per-IP-CAN-Session-Indicator, Enumerated, code 4400 - Civic-Address-Information, UTF8String, code 1305 - Coverage-Info, Grouped, code 3459 - Coverage-Status, Enumerated, code 3428 - Discoveree-UE-HPLMN-Identifier, UTF8String, code 4402 - Discoveree-UE-VPLMN-Identifier, UTF8String, code 4403 - Discoverer-UE-HPLMN-Identifier, UTF8String, code 4404 - Discoverer-UE-VPLMN-Identifier, UTF8String, code 4405 - EPDG-Address, Address, code 3425 - Enhanced-Diagnostics, Grouped, code 3901 - Exposure-Function-API-Information, Grouped, code 1316 - FE-Identifier-List, UTF8String, code 4413 - Forwarding-Pending, Enumerated, code 3415 - IMS-Visited-Network-Identifier, UTF8String, code 2713 - ISUP-Cause, Grouped, code 3416 - ISUP-Cause-Diagnostics, OctetString, code 3422 - ISUP-Cause-Location, Unsigned32, code 3423 - ISUP-Cause-Value, Unsigned32, code 3424 - ISUP-Location-Number, OctetString, code 3414 - Instance-Id, UTF8String, code 3402 - Inter-UE-Transfer, Enumerated, code 3902 - Language, UTF8String, code 3914 - Layer-2-Group-ID, OctetString, code 3429 - Location-Info, Grouped, code 3460 - MBMS-Charged-Party, Enumerated, code 2323 - MSC-Address, OctetString, code 3417 - MTC-IWF-Address, Address, code 3406 - Monitored-PLMN-Identifier, UTF8String, code 3430 - Monitoring-Event-Configuration-Activity, Integer32, code 3919 - Monitoring-Event-Functionality, Integer32, code 3922 - Monitoring-Event-Information, Grouped, code 3921 - Monitoring-Event-Report-Data, Grouped, code 3920 - Monitoring-Event-Report-Number, Unsigned32, code 3923 - Monitoring-UE-HPLMN-Identifier, UTF8String, code 3431 - Monitoring-UE-Identifier, UTF8String, code 3432 - Monitoring-UE-VPLMN-Identifier, UTF8String, code 3433 - NIDD-Submission, Grouped, code 3928 - Network-Call-Reference-Number, OctetString, code 3418 - PC3-Control-Protocol-Cause, Integer32, code 3434 - PC3-EPC-Control-Protocol-Cause, Integer32, code 3435 - PC5-Radio-Technology, Enumerated, code 1300 - Play-Alternative, Enumerated, code 3913 - Privacy-Indicator, Enumerated, code 3915 - ProSe-3rd-Party-Application-ID, UTF8String, code 3440 - ProSe-Direct-Communication-Reception-Data-Container, Grouped, code 3461 - ProSe-Direct-Communication-Transmission-Data-Container, Grouped, code 3441 - ProSe-Direct-Discovery-Model, Enumerated, code 3442 - ProSe-Event-Type, Enumerated, code 3443 - ProSe-Function-IP-Address, Address, code 3444 - ProSe-Function-PLMN-Identifier, UTF8String, code 3457 - ProSe-Functionality, Enumerated, code 3445 - ProSe-Group-IP-Multicast-Address, Address, code 3446 - ProSe-Information, Grouped, code 3447 - ProSe-Range-Class, Enumerated, code 3448 - ProSe-Reason-For-Cancellation, Enumerated, code 3449 - ProSe-Request-Timestamp, Time, code 3450 - ProSe-Role-Of-UE, Enumerated, code 3451 - ProSe-Source-IP-Address, Address, code 3452 - ProSe-Target-Layer-2-ID, OctetString, code 4410 - ProSe-UE-ID, OctetString, code 3453 - ProSe-UE-to-Network-Relay-UE-ID, OctetString, code 4409 - Proximity-Alert-Indication, Enumerated, code 3454 - Proximity-Alert-Timestamp, Time, code 3455 - Proximity-Cancellation-Timestamp, Time, code 3456 - Quota-Indicator, Enumerated, code 3912 - RAN-End-Time, Time, code 1301 - RAN-Secondary-RAT-Usage-Report, Grouped, code 1302 - RAN-Start-Time, Time, code 1303 - Radio-Frequency, OctetString, code 3462 - Radio-Parameter-Set-Info, Grouped, code 3463 - Radio-Parameter-Set-Values, OctetString, code 3464 - Radio-Resources-Indicator, Integer32, code 3465 - Rate-Control-Max-Message-Size, Unsigned32, code 3937 - Rate-Control-Max-Rate, Unsigned32, code 3938 - Rate-Control-Time-Unit, Unsigned32, code 3939 - Reason-Header, UTF8String, code 3401 - Related-Change-Condition-Information, Grouped, code 3925 - Related-IMS-Charging-Identifier, UTF8String, code 2711 - Related-IMS-Charging-Identifier-Node, Address, code 2712 - Related-Trigger, Grouped, code 3926 - Relay-IP-address, Address, code 4411 - Requested-PLMN-Identifier, UTF8String, code 3436 - Requestor-PLMN-Identifier, UTF8String, code 3437 - Role-Of-ProSe-Function, Enumerated, code 3438 - Route-Header-Received, UTF8String, code 3403 - Route-Header-Transmitted, UTF8String, code 3404 - SCEF-Address, Address, code 1317 - SCS-AS-Address, Grouped, code 3940 - SCS-Address, Address, code 3941 - SCS-Realm, DiameterIdentity, code 3942 - SGi-PtP-Tunnelling-Method, Enumerated, code 3931 - SM-Device-Trigger-Indicator, Enumerated, code 3407 - SM-Device-Trigger-Information, Grouped, code 3405 - SM-Sequence-Number, Unsigned32, code 3408 - SMS-Result, Unsigned32, code 3409 - Secondary-RAT-Type, OctetString, code 1304 - Serving-Node-Identity, DiameterIdentity, code 3929 - Start-of-Charging, Time, code 3419 - TAD-Identifier, Enumerated, code 2717 - TLTRI, Unsigned32, code 1318 - TWAG-Address, Address, code 3903 - TWAN-User-Location-Info, Grouped, code 2714 - Target-IP-Address, Address, code 4412 - Teleservice, OctetString, code 3413 - Time-First-Reception, Time, code 3466 - Time-First-Transmission, Time, code 3467 - Time-Indicator, Unsigned32, code 3911 - Transmitter-Info, Grouped, code 3468 - UNI-PDU-CP-Only-Flag, Enumerated, code 3932 - UWAN-User-Location-Info, Grouped, code 3918 - Unused-Quota-Timer, Unsigned32, code 4407 - Usage-Information-Report-Sequence-Number, Integer32, code 3439 - VCS-Information, Grouped, code 3410 - VLR-Number, OctetString, code 3420 - Variable-Part, Grouped, code 3907 - Variable-Part-Order, Unsigned32, code 3908 - Variable-Part-Type, Unsigned32, code 3909 - Variable-Part-Value, UTF8String, code 3910 - WLAN-Operator-Id, Grouped, code 1306 - WLAN-Operator-Name, UTF8String, code 1307 - WLAN-PLMN-Id, UTF8String, code 1308 3GPP TS 32.299 V11.8.0 (2013-07) renamed LCS-Requestor-Id (1239) to LCS-Requestor-ID (1239). 3GPP TS 32.299 V11.8.0 (2013-07) renamed LCS-Requestor-Id-String (1240) to LCS-Requestor-ID-String (1240). 3GPP TS 32.299 V13.1.0 (2015-06) renamed PoC-User-Role-info-Units (1254) to PoC-User-Role-Info-Units (1254). 3GPP TS 32.299 V11.10.0 (2013-12) renamed Status (2702) to Status-Code (2702), and then 3GPP TS 32.299 V11.14.0 (2014-12) renamed Status-Code (2702) to Status-AS-Code (2702).
author Luke Mewburn <luke@mewburn.net>
date Thu, 26 Mar 2020 15:26:18 +1100
parents 3cbe458fbfa9
children d25ce064c667
line wrap: on
line source

/*********************************************************************************************************
* Software License Agreement (BSD License)                                                               *
* Author: Sebastien Decugis <sdecugis@freediameter.net>							 *
*													 *
* Copyright (c) 2013, WIDE Project and NICT								 *
* All rights reserved.											 *
* 													 *
* Redistribution and use of this software in source and binary forms, with or without modification, are  *
* permitted provided that the following conditions are met:						 *
* 													 *
* * Redistributions of source code must retain the above 						 *
*   copyright notice, this list of conditions and the 							 *
*   following disclaimer.										 *
*    													 *
* * Redistributions in binary form must reproduce the above 						 *
*   copyright notice, this list of conditions and the 							 *
*   following disclaimer in the documentation and/or other						 *
*   materials provided with the distribution.								 *
* 													 *
* * Neither the name of the WIDE Project or NICT nor the 						 *
*   names of its contributors may be used to endorse or 						 *
*   promote products derived from this software without 						 *
*   specific prior written permission of WIDE Project and 						 *
*   NICT.												 *
* 													 *
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED *
* WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A *
* PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR *
* ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 	 *
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS 	 *
* INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR *
* TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF   *
* ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.								 *
*********************************************************************************************************/

/* FIFO queues module.
 *
 * The threads that call these functions must be in the cancellation state PTHREAD_CANCEL_ENABLE and type PTHREAD_CANCEL_DEFERRED.
 * This is the default state and type on thread creation.
 *
 * In order to destroy properly a queue, the application must:
 *  -> shutdown any process that can add into the queue first.
 *  -> pthread_cancel any thread that could be waiting on the queue.
 *  -> consume any element that is in the queue, using fd_qu_tryget_int.
 *  -> then destroy the queue using fd_mq_del.
 */

#include "fdproto-internal.h"

/* Definition of a FIFO queue object */
struct fifo {
	int		eyec;	/* An eye catcher, also used to check a queue is valid. FIFO_EYEC */

	pthread_mutex_t	mtx;	/* Mutex protecting this queue */
	pthread_cond_t	cond_pull;	/* condition variable for pulling threads */
	pthread_cond_t	cond_push;	/* condition variable for pushing threads */

	struct fd_list	list;	/* sentinel for the list of elements */
	int		count;	/* number of objects in the list */
	int		thrs;	/* number of threads waiting for a new element (when count is 0) */

	int 		max;	/* maximum number of items to accept if not 0 */
	int		thrs_push; /* number of threads waitnig to push an item */

	uint16_t	high;	/* High level threshold (see libfreeDiameter.h for details) */
	uint16_t	low;	/* Low level threshhold */
	void 		*data;	/* Opaque pointer for threshold callbacks */
	void		(*h_cb)(struct fifo *, void **); /* The callbacks */
	void		(*l_cb)(struct fifo *, void **);
	int 		highest;/* The highest count value for which h_cb has been called */
	int		highest_ever; /* The max count value this queue has reached (for tweaking) */

	long long	total_items;   /* Cumulated number of items that went through this fifo (excluding current count), always increasing. */
	struct timespec total_time;    /* Cumulated time all items spent in this queue, including blocking time (always growing, use deltas for monitoring) */
	struct timespec blocking_time; /* Cumulated time threads trying to post new items were blocked (queue full). */
	struct timespec last_time;     /* For the last element retrieved from the queue, how long it take between posting (including blocking) and poping */

};

struct fifo_item {
	struct fd_list   item;
	struct timespec  posted_on;
};

/* The eye catcher value */
#define FIFO_EYEC	0xe7ec1130

/* Macro to check a pointer */
#define CHECK_FIFO( _queue ) (( (_queue) != NULL) && ( (_queue)->eyec == FIFO_EYEC) )


/* Create a new queue, with max number of items -- use 0 for no max */
int fd_fifo_new ( struct fifo ** queue, int max )
{
	struct fifo * new;

	TRACE_ENTRY( "%p", queue );

	CHECK_PARAMS( queue );

	/* Create a new object */
	CHECK_MALLOC( new = malloc (sizeof (struct fifo) )  );

	/* Initialize the content */
	memset(new, 0, sizeof(struct fifo));

	new->eyec = FIFO_EYEC;
	CHECK_POSIX( pthread_mutex_init(&new->mtx, NULL) );
	CHECK_POSIX( pthread_cond_init(&new->cond_pull, NULL) );
	CHECK_POSIX( pthread_cond_init(&new->cond_push, NULL) );
	new->max = max;

	fd_list_init(&new->list, NULL);

	/* We're done */
	*queue = new;
	return 0;
}

int fd_fifo_set_max (struct fifo * queue, int max)
{
    queue->max = max;
    return 0;
}


/* Dump the content of a queue */
DECLARE_FD_DUMP_PROTOTYPE(fd_fifo_dump, char * name, struct fifo * queue, fd_fifo_dump_item_cb dump_item)
{
	FD_DUMP_HANDLE_OFFSET();

	if (name) {
		CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "'%s'(@%p): ", name, queue), return NULL);
	} else {
		CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "{fifo}(@%p): ", queue), return NULL);
	}

	if (!CHECK_FIFO( queue )) {
		return fd_dump_extend(FD_DUMP_STD_PARAMS, "INVALID/NULL");
	}

	CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), /* continue */  );
	CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "items:%d,%d,%d threads:%d,%d stats:%lld/%ld.%06ld,%ld.%06ld,%ld.%06ld thresholds:%d,%d,%d,%p,%p,%p",
						queue->count, queue->highest_ever, queue->max,
						queue->thrs, queue->thrs_push,
						queue->total_items,(long)queue->total_time.tv_sec,(long)(queue->total_time.tv_nsec/1000),(long)queue->blocking_time.tv_sec,(long)(queue->blocking_time.tv_nsec/1000),(long)queue->last_time.tv_sec,(long)(queue->last_time.tv_nsec/1000),
						queue->high, queue->low, queue->highest, queue->h_cb, queue->l_cb, queue->data),
			 goto error);

	if (dump_item) {
		struct fd_list * li;
		int i = 0;
		for (li = queue->list.next; li != &queue->list; li = li->next) {
			struct fifo_item * fi = (struct fifo_item *)li;
			CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n [#%i](@%p)@%ld.%06ld: ",
						i++, fi->item.o, (long)fi->posted_on.tv_sec,(long)(fi->posted_on.tv_nsec/1000)),
					 goto error);
			CHECK_MALLOC_DO( (*dump_item)(FD_DUMP_STD_PARAMS, fi->item.o), goto error);
		}
	}
	CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), /* continue */  );

	return *buf;
error:
	CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), /* continue */  );
	return NULL;
}

/* Delete a queue. It must be empty. */
int fd_fifo_del ( struct fifo  ** queue )
{
	struct fifo * q;
#ifdef DEBUG
	int loops = 0;
#endif

	TRACE_ENTRY( "%p", queue );

	if (queue && *queue == NULL) {
		/* Queue already (in the process of being) deleted */
		return 0;
	}

	CHECK_PARAMS( queue && CHECK_FIFO( *queue ) );

	q = *queue;

	CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );

	if ((q->count != 0) || (q->data != NULL)) {
		TRACE_DEBUG(INFO, "The queue cannot be destroyed (%d, %p)", q->count, q->data);
		CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ), /* no fallback */  );
		return EINVAL;
	}

	/* Ok, now invalidate the queue */
	q->eyec = 0xdead;

	/* Have all waiting threads return an error */
	while (q->thrs) {
		CHECK_POSIX(  pthread_mutex_unlock( &q->mtx ));
		CHECK_POSIX(  pthread_cond_signal(&q->cond_pull)  );
		usleep(1000);

		CHECK_POSIX(  pthread_mutex_lock( &q->mtx )  );
		ASSERT( ++loops < 200 ); /* detect infinite loops */
	}

	/* sanity check */
	ASSERT(FD_IS_LIST_EMPTY(&q->list));

	/* And destroy it */
	CHECK_POSIX(  pthread_mutex_unlock( &q->mtx )  );

	CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_pull ),  );

	CHECK_POSIX_DO(  pthread_cond_destroy( &q->cond_push ),  );

	CHECK_POSIX_DO(  pthread_mutex_destroy( &q->mtx ),  );

	free(q);
	*queue = NULL;

	return 0;
}

/* Move the content of old into new, and update loc_update atomically. We leave the old queue empty but valid */
int fd_fifo_move ( struct fifo * old, struct fifo * new, struct fifo ** loc_update )
{
#ifdef DEBUG
	int loops = 0;
#endif

	TRACE_ENTRY("%p %p %p", old, new, loc_update);
	CHECK_PARAMS( CHECK_FIFO( old ) && CHECK_FIFO( new ));

	CHECK_PARAMS( ! old->data );
	if (new->high) {
		TODO("Implement support for thresholds in fd_fifo_move...");
	}

	/* Update loc_update */
	if (loc_update)
		*loc_update = new;

	/* Lock the queues */
	CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );

	CHECK_PARAMS_DO( (! old->thrs_push), {
			pthread_mutex_unlock( &old->mtx );
			return EINVAL;
		} );

	CHECK_POSIX(  pthread_mutex_lock( &new->mtx )  );

	/* Any waiting thread on the old queue returns an error */
	old->eyec = 0xdead;
	while (old->thrs) {
		CHECK_POSIX(  pthread_mutex_unlock( &old->mtx ));
		CHECK_POSIX(  pthread_cond_signal( &old->cond_pull )  );
		usleep(1000);

		CHECK_POSIX(  pthread_mutex_lock( &old->mtx )  );
		ASSERT( loops < 20 ); /* detect infinite loops */
	}

	/* Move all data from old to new */
	fd_list_move_end( &new->list, &old->list );
	if (old->count && (!new->count)) {
		CHECK_POSIX(  pthread_cond_signal(&new->cond_pull)  );
	}
	new->count += old->count;

	/* Reset old */
	old->count = 0;
	old->eyec = FIFO_EYEC;

	/* Merge the stats in the new queue */
	new->total_items += old->total_items;
	old->total_items = 0;

	new->total_time.tv_nsec += old->total_time.tv_nsec;
	new->total_time.tv_sec += old->total_time.tv_sec + (new->total_time.tv_nsec / 1000000000);
	new->total_time.tv_nsec %= 1000000000;
	old->total_time.tv_nsec = 0;
	old->total_time.tv_sec = 0;

	new->blocking_time.tv_nsec += old->blocking_time.tv_nsec;
	new->blocking_time.tv_sec += old->blocking_time.tv_sec + (new->blocking_time.tv_nsec / 1000000000);
	new->blocking_time.tv_nsec %= 1000000000;
	old->blocking_time.tv_nsec = 0;
	old->blocking_time.tv_sec = 0;

	/* Unlock, we're done */
	CHECK_POSIX(  pthread_mutex_unlock( &new->mtx )  );
	CHECK_POSIX(  pthread_mutex_unlock( &old->mtx )  );

	return 0;
}

/* Get the information on the queue */
int fd_fifo_getstats( struct fifo * queue, int * current_count, int * limit_count, int * highest_count, long long * total_count,
				           struct timespec * total, struct timespec * blocking, struct timespec * last)
{
	TRACE_ENTRY( "%p %p %p %p %p %p %p %p", queue, current_count, limit_count, highest_count, total_count, total, blocking, last);

	if (queue == NULL) {
		/* It is not an error if the queue is not available; happens e.g. when peers disappear */
		return 0;
	}

	/* Check the parameters */
	CHECK_PARAMS( CHECK_FIFO( queue ) );

	/* lock the queue */
	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );

	if (current_count)
		*current_count = queue->count;

	if (limit_count)
		*limit_count = queue->max;

	if (highest_count)
		*highest_count = queue->highest_ever;

	if (total_count)
		*total_count = queue->total_items;

	if (total)
		memcpy(total, &queue->total_time, sizeof(struct timespec));

	if (blocking)
		memcpy(blocking, &queue->blocking_time, sizeof(struct timespec));

	if (last)
		memcpy(last, &queue->last_time, sizeof(struct timespec));

	/* Unlock */
	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );

	/* Done */
	return 0;
}


/* alternate version with no error checking */
int fd_fifo_length ( struct fifo * queue )
{
	if ( !CHECK_FIFO( queue ) )
		return 0;

	return queue->count; /* Let's hope it's read atomically, since we are not locking... */
}

/* Set the thresholds of the queue */
int fd_fifo_setthrhd ( struct fifo * queue, void * data, uint16_t high, void (*h_cb)(struct fifo *, void **), uint16_t low, void (*l_cb)(struct fifo *, void **) )
{
	TRACE_ENTRY( "%p %p %hu %p %hu %p", queue, data, high, h_cb, low, l_cb );

	/* Check the parameters */
	CHECK_PARAMS( CHECK_FIFO( queue ) && (high > low) && (queue->data == NULL) );

	/* lock the queue */
	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );

	/* Save the values */
	queue->high = high;
	queue->low  = low;
	queue->data = data;
	queue->h_cb = h_cb;
	queue->l_cb = l_cb;

	/* Unlock */
	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );

	/* Done */
	return 0;
}


/* This handler is called when a thread is blocked on a queue, and cancelled */
static void fifo_cleanup_push(void * queue)
{
	struct fifo * q = (struct fifo *)queue;
	TRACE_ENTRY( "%p", queue );

	/* The thread has been cancelled, therefore it does not wait on the queue anymore */
	q->thrs_push--;

	/* Now unlock the queue, and we're done */
	CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  );

	/* End of cleanup handler */
	return;
}


/* Post a new item in the queue */
int fd_fifo_post_internal ( struct fifo * queue, void ** item, int skip_max )
{
	struct fifo_item * new;
	int call_cb = 0;
	struct timespec posted_on, queued_on;

	/* Get the timing of this call */
	CHECK_SYS(  clock_gettime(CLOCK_REALTIME, &posted_on)  );

	/* lock the queue */
	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );

	if ((!skip_max) && (queue->max)) {
		while (queue->count >= queue->max) {
			int ret = 0;

			/* We have to wait for an item to be pulled */
			queue->thrs_push++ ;
			pthread_cleanup_push( fifo_cleanup_push, queue);
			ret = pthread_cond_wait( &queue->cond_push, &queue->mtx );
			pthread_cleanup_pop(0);
			queue->thrs_push-- ;

#ifndef DEBUG
			(void)ret;
#endif
			ASSERT( ret == 0 );
		}
	}

	/* Create a new list item */
	CHECK_MALLOC_DO(  new = malloc (sizeof (struct fifo_item)) , {
			pthread_mutex_unlock( &queue->mtx );
			return ENOMEM;
		} );

	fd_list_init(&new->item, *item);
	*item = NULL;

	/* Add the new item at the end */
	fd_list_insert_before( &queue->list, &new->item);
	queue->count++;
	if (queue->highest_ever < queue->count)
		queue->highest_ever = queue->count;
	if (queue->high && ((queue->count % queue->high) == 0)) {
		call_cb = 1;
		queue->highest = queue->count;
	}

	/* store timing */
	memcpy(&new->posted_on, &posted_on, sizeof(struct timespec));

	/* update queue timing info "blocking time" */
	{
		long long blocked_ns;
		CHECK_SYS(  clock_gettime(CLOCK_REALTIME, &queued_on)  );
		blocked_ns = (queued_on.tv_sec - posted_on.tv_sec) * 1000000000;
		blocked_ns += (queued_on.tv_nsec - posted_on.tv_nsec);
		blocked_ns += queue->blocking_time.tv_nsec;
		queue->blocking_time.tv_sec += blocked_ns / 1000000000;
		queue->blocking_time.tv_nsec = blocked_ns % 1000000000;
	}

	/* Signal if threads are asleep */
	if (queue->thrs > 0) {
		CHECK_POSIX(  pthread_cond_signal(&queue->cond_pull)  );
	}
	if (queue->thrs_push > 0) {
		/* cascade */
		CHECK_POSIX(  pthread_cond_signal(&queue->cond_push)  );
	}

	/* Unlock */
	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );

	/* Call high-watermark cb as needed */
	if (call_cb && queue->h_cb)
		(*queue->h_cb)(queue, &queue->data);

	/* Done */
	return 0;
}

/* Post a new item in the queue */
int fd_fifo_post_int ( struct fifo * queue, void ** item )
{
	TRACE_ENTRY( "%p %p", queue, item );

	/* Check the parameters */
	CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );

	return fd_fifo_post_internal ( queue,item, 0 );

}

/* Post a new item in the queue, not blocking */
int fd_fifo_post_noblock ( struct fifo * queue, void ** item )
{
	TRACE_ENTRY( "%p %p", queue, item );

	/* Check the parameters */
	CHECK_PARAMS( CHECK_FIFO( queue ) && item && *item );

	return fd_fifo_post_internal ( queue,item, 1 );

}

/* Pop the first item from the queue */
static void * mq_pop(struct fifo * queue)
{
	void * ret = NULL;
	struct fifo_item * fi;
	struct timespec now;

	ASSERT( ! FD_IS_LIST_EMPTY(&queue->list) );

	fi = (struct fifo_item *)(queue->list.next);
	ret = fi->item.o;
	fd_list_unlink(&fi->item);
	queue->count--;
	queue->total_items++;

	/* Update the timings */
	CHECK_SYS_DO(  clock_gettime(CLOCK_REALTIME, &now), goto skip_timing  );
	{
		long long elapsed = (now.tv_sec - fi->posted_on.tv_sec) * 1000000000;
		elapsed += now.tv_nsec - fi->posted_on.tv_nsec;

		queue->last_time.tv_sec = elapsed / 1000000000;
		queue->last_time.tv_nsec = elapsed % 1000000000;

		elapsed += queue->total_time.tv_nsec;
		queue->total_time.tv_sec += elapsed / 1000000000;
		queue->total_time.tv_nsec = elapsed % 1000000000;
	}
skip_timing:
	free(fi);

	if (queue->thrs_push) {
		CHECK_POSIX_DO( pthread_cond_signal( &queue->cond_push ), );
	}

	return ret;
}

/* Check if the low watermark callback must be called. */
static __inline__ int test_l_cb(struct fifo * queue)
{
	if ((queue->high == 0) || (queue->low == 0) || (queue->l_cb == 0))
		return 0;

	if (((queue->count % queue->high) == queue->low) && (queue->highest > queue->count)) {
		queue->highest -= queue->high;
		return 1;
	}

	return 0;
}

/* Try poping an item */
int fd_fifo_tryget_int ( struct fifo * queue, void ** item )
{
	int wouldblock = 0;
	int call_cb = 0;

	TRACE_ENTRY( "%p %p", queue, item );

	/* Check the parameters */
	CHECK_PARAMS( CHECK_FIFO( queue ) && item );

	/* lock the queue */
	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );

	/* Check queue status */
	if (queue->count > 0) {
got_item:
		/* There are elements in the queue, so pick the first one */
		*item = mq_pop(queue);
		call_cb = test_l_cb(queue);
	} else {
		if (queue->thrs_push > 0) {
			/* A thread is trying to push something, let's give it a chance */
			CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
			CHECK_POSIX(  pthread_cond_signal( &queue->cond_push )  );
			usleep(1000);
			CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );
			if (queue->count > 0)
				goto got_item;
		}

		wouldblock = 1;
		*item = NULL;
	}

	/* Unlock */
	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );

	/* Call low watermark callback as needed */
	if (call_cb)
		(*queue->l_cb)(queue, &queue->data);

	/* Done */
	return wouldblock ? EWOULDBLOCK : 0;
}

/* This handler is called when a thread is blocked on a queue, and cancelled */
static void fifo_cleanup(void * queue)
{
	struct fifo * q = (struct fifo *)queue;
	TRACE_ENTRY( "%p", queue );

	/* The thread has been cancelled, therefore it does not wait on the queue anymore */
	q->thrs--;

	/* Now unlock the queue, and we're done */
	CHECK_POSIX_DO(  pthread_mutex_unlock( &q->mtx ),  /* nothing */  );

	/* End of cleanup handler */
	return;
}

/* The internal function for fd_fifo_timedget and fd_fifo_get */
static int fifo_tget ( struct fifo * queue, void ** item, int istimed, const struct timespec *abstime)
{
	int call_cb = 0;
	int ret = 0;

	/* Check the parameters */
	CHECK_PARAMS( CHECK_FIFO( queue ) && item && (abstime || !istimed) );

	/* Initialize the return value */
	*item = NULL;

	/* lock the queue */
	CHECK_POSIX(  pthread_mutex_lock( &queue->mtx )  );

awaken:
	/* Check queue status */
	if (!CHECK_FIFO( queue )) {
		/* The queue is being destroyed */
		CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );
		TRACE_DEBUG(FULL, "The queue is being destroyed -> EPIPE");
		return EPIPE;
	}

	if (queue->count > 0) {
		/* There are items in the queue, so pick the first one */
		*item = mq_pop(queue);
		call_cb = test_l_cb(queue);
	} else {
		/* We have to wait for a new item */
		queue->thrs++ ;
		pthread_cleanup_push( fifo_cleanup, queue);
		if (istimed) {
			ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
		} else {
			ret = pthread_cond_wait( &queue->cond_pull, &queue->mtx );
		}
		pthread_cleanup_pop(0);
		queue->thrs-- ;
		if (ret == 0)
			goto awaken;  /* test for spurious wake-ups */

		/* otherwise (ETIMEDOUT / other error) just continue */
	}

	/* Unlock */
	CHECK_POSIX(  pthread_mutex_unlock( &queue->mtx )  );

	/* Call low watermark callback as needed */
	if (call_cb)
		(*queue->l_cb)(queue, &queue->data);

	/* Done */
	return ret;
}

/* Get the next available item, block until there is one */
int fd_fifo_get_int ( struct fifo * queue, void ** item )
{
	TRACE_ENTRY( "%p %p", queue, item );
	return fifo_tget(queue, item, 0, NULL);
}

/* Get the next available item, block until there is one, or the timeout expires */
int fd_fifo_timedget_int ( struct fifo * queue, void ** item, const struct timespec *abstime )
{
	TRACE_ENTRY( "%p %p %p", queue, item, abstime );
	return fifo_tget(queue, item, 1, abstime);
}

/* Test if data is available in the queue, without pulling it */
int fd_fifo_select ( struct fifo * queue, const struct timespec *abstime )
{
	int ret = 0;
	TRACE_ENTRY( "%p %p", queue, abstime );

	CHECK_PARAMS_DO( CHECK_FIFO( queue ), return -EINVAL );

	/* lock the queue */
	CHECK_POSIX_DO(  pthread_mutex_lock( &queue->mtx ), return -__ret__  );

awaken:
	ret = (queue->count > 0 ) ? queue->count : 0;
	if ((ret == 0) && (abstime != NULL)) {
		/* We have to wait for a new item */
		queue->thrs++ ;
		pthread_cleanup_push( fifo_cleanup, queue);
		ret = pthread_cond_timedwait( &queue->cond_pull, &queue->mtx, abstime );
		pthread_cleanup_pop(0);
		queue->thrs-- ;
		if (ret == 0)
			goto awaken;  /* test for spurious wake-ups */

		if (ret == ETIMEDOUT)
			ret = 0;
		else
			ret = -ret;
	}

	/* Unlock */
	CHECK_POSIX_DO(  pthread_mutex_unlock( &queue->mtx ), return -__ret__  );

	return ret;
}
"Welcome to our mercurial repository"