Mercurial > hg > freeDiameter
diff freeDiameter/peers.c @ 13:ef9ef3bf4752
Progress on peer state machine
author | Sebastien Decugis <sdecugis@nict.go.jp> |
---|---|
date | Wed, 30 Sep 2009 18:25:46 +0900 |
parents | 418d2ce80dc8 |
children | 14cf6daf716d |
line wrap: on
line diff
--- a/freeDiameter/peers.c Mon Sep 28 17:29:25 2009 +0900 +++ b/freeDiameter/peers.c Wed Sep 30 18:25:46 2009 +0900 @@ -50,35 +50,6 @@ struct fd_list fd_g_peers; pthread_rwlock_t fd_g_peers_rw; -static int started = 0; -static pthread_mutex_t started_mtx = PTHREAD_MUTEX_INITIALIZER; -static pthread_cond_t started_cnd = PTHREAD_COND_INITIALIZER; - -/* Wait for start signal */ -int fd_peer_waitstart() -{ - CHECK_POSIX( pthread_mutex_lock(&started_mtx) ); -awake: - if (! started) { - pthread_cleanup_push( fd_cleanup_mutex, &started_mtx ); - CHECK_POSIX( pthread_cond_wait(&started_cnd, &started_mtx) ); - pthread_cleanup_pop( 0 ); - goto awake; - } - CHECK_POSIX( pthread_mutex_unlock(&started_mtx) ); - return 0; -} - -/* Allow the state machines to start */ -int fd_peer_start() -{ - CHECK_POSIX( pthread_mutex_lock(&started_mtx) ); - started = 1; - CHECK_POSIX( pthread_cond_broadcast(&started_cnd) ); - CHECK_POSIX( pthread_mutex_unlock(&started_mtx) ); - return 0; -} - /* Initialize the peers list */ int fd_peer_init() { @@ -87,6 +58,20 @@ fd_list_init(&fd_g_peers, NULL); CHECK_POSIX( pthread_rwlock_init(&fd_g_peers_rw, NULL) ); + CHECK_FCT(fd_p_expi_init()); + + return 0; +} + +/* Terminate peer module (destroy all peers) */ +int fd_peer_fini() +{ + TRACE_ENTRY(); + + CHECK_FCT_DO(fd_p_expi_fini(), /* continue */); + + TODO("Complete this function") + return 0; } @@ -118,8 +103,235 @@ CHECK_FCT_DO( pthread_rwlock_unlock(&fd_g_peers_rw), /* continue */ ); } +/* Alloc / reinit a peer structure. if *ptr is not NULL, it must already point to a valid struct fd_peer. */ +static int fd_sp_reinit(struct fd_peer ** ptr) +{ + struct fd_peer *p; + + TRACE_ENTRY("%p", ptr); + CHECK_PARAMS(ptr); + + if (*ptr) { + p = *ptr; + } else { + CHECK_MALLOC( p = malloc(sizeof(struct fd_peer)) ); + *ptr = p; + } + + /* Now initialize the content */ + memset(p, 0, sizeof(struct fd_peer)); + + fd_list_init(&p->p_hdr.chain, p); + + fd_list_init(&p->p_hdr.info.pi_endpoints, NULL); + p->p_hdr.info.pi_state = STATE_DISABLED; + fd_list_init(&p->p_hdr.info.pi_apps, NULL); + + p->p_eyec = EYEC_PEER; + CHECK_POSIX( pthread_mutex_init(&p->p_mtx, NULL) ); + fd_list_init(&p->p_expiry, p); + fd_list_init(&p->p_actives, p); + p->p_hbh = lrand48(); + CHECK_FCT( fd_fifo_new(&p->p_events) ); + CHECK_FCT( fd_fifo_new(&p->p_recv) ); + CHECK_FCT( fd_fifo_new(&p->p_tosend) ); + fd_list_init(&p->p_sentreq, p); + + return 0; +} + +#define free_null( _v ) \ + if (_v) { \ + free(_v); \ + (_v) = NULL; \ + } + +#define free_list( _l ) \ + while (!FD_IS_LIST_EMPTY(_l)) { \ + struct fd_list * __li = ((struct fd_list *)(_l))->next; \ + fd_list_unlink(__li); \ + free(__li); \ + } + +/* Destroy a structure once all cleanups have been performed */ +static int fd_sp_destroy(struct fd_peer ** ptr) +{ + struct fd_peer *p; + void * t; + + TRACE_ENTRY("%p", ptr); + CHECK_PARAMS(ptr); + p = *ptr; + *ptr = NULL; + CHECK_PARAMS(p); + + CHECK_PARAMS( (p->p_refcount == 0) && FD_IS_LIST_EMPTY(&p->p_hdr.chain) ); + + free_null(p->p_hdr.info.pi_diamid); + free_null(p->p_hdr.info.pi_realm); + free_list( &p->p_hdr.info.pi_endpoints ); + /* Assume the security data is already freed */ + free_null(p->p_hdr.info.pi_prodname); + free_list( &p->p_hdr.info.pi_apps ); + + free_null(p->p_dbgorig); + CHECK_POSIX( pthread_mutex_destroy(&p->p_mtx) ); + ASSERT(FD_IS_LIST_EMPTY(&p->p_expiry)); + ASSERT(FD_IS_LIST_EMPTY(&p->p_actives)); + + CHECK_FCT( fd_thr_term(&p->p_psm) ); + while ( fd_fifo_tryget(p->p_events, &t) == 0 ) { + struct fd_event * ev = t; + TRACE_DEBUG(FULL, "Found event %d(%p) in queue of peer %p being destroyed", ev->code, ev->data, p); + free(ev); + } + CHECK_FCT( fd_fifo_del(&p->p_events) ); + + CHECK_FCT( fd_thr_term(&p->p_inthr) ); + while ( fd_fifo_tryget(p->p_recv, &t) == 0 ) { + struct msg * m = t; + TRACE_DEBUG(FULL, "Found message %p in incoming queue of peer %p being destroyed", m, p); + /* We simply destroy, the remote peer will re-send to someone else...*/ + CHECK_FCT(fd_msg_free(m)); + } + CHECK_FCT( fd_fifo_del(&p->p_recv) ); + + CHECK_FCT( fd_thr_term(&p->p_outthr) ); + while ( fd_fifo_tryget(p->p_tosend, &t) == 0 ) { + struct msg * m = t; + TRACE_DEBUG(FULL, "Found message %p in outgoing queue of peer %p being destroyed, requeue", m, p); + /* We simply requeue in global, the routing thread will re-handle it. */ + + } + CHECK_FCT( fd_fifo_del(&p->p_tosend) ); + + while (!FD_IS_LIST_EMPTY(&p->p_sentreq)) { + struct sentreq * sr = (struct sentreq *)(p->p_sentreq.next); + fd_list_unlink(&sr->chain); + TRACE_DEBUG(FULL, "Found message %p in list of sent requests to peer %p being destroyed, requeue (fallback)", sr->req, p); + CHECK_FCT(fd_fifo_post(fd_g_outgoing, &sr->req)); + free(sr); + } + + TRACE_DEBUG(NONE, "TODO: destroy p->p_cnxctx here"); + + if (p->p_cb) + (*p->p_cb)(NULL, p->p_cb_data); + + free(p); + + return 0; +} + +/* Decrement refcount, delete if 0 */ +int fd_peer_rc_decr(struct fd_peer **ptr, int locked) +{ + int count; + struct fd_peer *p; + TRACE_ENTRY("%p %d", p, locked); + + CHECK_PARAMS(ptr && CHECK_PEER( *ptr )); + p = *ptr; + + if (!locked) { + CHECK_POSIX( pthread_rwlock_rdlock(&fd_g_peers_rw) ); + CHECK_POSIX( pthread_mutex_lock( &p->p_mtx ) ); + CHECK_POSIX( pthread_rwlock_unlock(&fd_g_peers_rw) ); + } + + count = --(p->p_refcount); + + if (!locked) { + CHECK_POSIX( pthread_mutex_unlock( &p->p_mtx ) ); + } + + if (count <= 0) { + /* All links have already been removed, we can destroy */ + CHECK_FCT( fd_sp_destroy(ptr) ); + } + return 0; +} + /* Add a new peer entry */ int fd_peer_add ( struct peer_info * info, char * orig_dbg, void (*cb)(struct peer_info *, void *), void * cb_data ) { - return ENOTSUP; + struct fd_peer *p = NULL; + struct fd_list * li; + int ret = 0; + TRACE_ENTRY("%p %p %p %p", info, orig_dbg, cb, cb_data); + CHECK_PARAMS(info && info->pi_diamid); + + /* Create a structure to contain the new peer information */ + CHECK_FCT( fd_sp_reinit(&p) ); + + /* Copy the informations from the parameters received */ + CHECK_MALLOC( p->p_hdr.info.pi_diamid = strdup(info->pi_diamid) ); + if (info->pi_realm) { + CHECK_MALLOC( p->p_hdr.info.pi_realm = strdup(info->pi_realm) ); + } + + p->p_hdr.info.pi_flags.pro3 = info->pi_flags.pro3; + p->p_hdr.info.pi_flags.pro4 = info->pi_flags.pro4; + p->p_hdr.info.pi_flags.alg = info->pi_flags.alg; + p->p_hdr.info.pi_flags.sec = info->pi_flags.sec; + p->p_hdr.info.pi_flags.exp = info->pi_flags.exp; + + p->p_hdr.info.pi_lft = info->pi_lft; + p->p_hdr.info.pi_streams = info->pi_streams; + p->p_hdr.info.pi_port = info->pi_port; + p->p_hdr.info.pi_tctimer = info->pi_tctimer; + p->p_hdr.info.pi_twtimer = info->pi_twtimer; + + /* Move the items from one list to the other */ + while (!FD_IS_LIST_EMPTY( &info->pi_endpoints ) ) { + li = info->pi_endpoints.next; + fd_list_unlink(li); + fd_list_insert_before(&p->p_hdr.info.pi_endpoints, li); + } + + p->p_hdr.info.pi_sec_module = info->pi_sec_module; + memcpy(&p->p_hdr.info.pi_sec_data, &info->pi_sec_data, sizeof(info->pi_sec_data)); + + /* The internal data */ + if (orig_dbg) { + CHECK_MALLOC( p->p_dbgorig = strdup(orig_dbg) ); + } else { + CHECK_MALLOC( p->p_dbgorig = strdup("unknown") ); + } + p->p_cb = cb; + p->p_cb_data = cb_data; + + /* Ok, now check if we don't already have an entry with the same Diameter Id, and insert this one */ + CHECK_POSIX( pthread_rwlock_wrlock(&fd_g_peers_rw) ); + CHECK_POSIX( pthread_mutex_lock( &p->p_mtx ) ); + + for (li = fd_g_peers.next; li != &fd_g_peers; li = li->next) { + struct fd_peer * prev = (struct fd_peer *)li; + int cmp = strcasecmp( p->p_hdr.info.pi_diamid, prev->p_hdr.info.pi_diamid ); + if (cmp < 0) + continue; + if (cmp == 0) + ret = EEXIST; + break; + } + + /* We can insert the new peer object */ + if (! ret) { + /* Update expiry list */ + CHECK_FCT_DO( ret = fd_p_expi_update( p, 1 ), goto out ); + + /* Insert the new element in the list */ + fd_list_insert_before( li, &p->p_hdr.chain ); + p->p_refcount++; + } + +out: + CHECK_POSIX( pthread_mutex_unlock( &p->p_mtx ) ); + CHECK_POSIX( pthread_rwlock_unlock(&fd_g_peers_rw) ); + if (ret) { + CHECK_FCT( fd_sp_destroy(&p) ); + } else { + CHECK_FCT( fd_psm_start(p) ); + } + return ret; }