Changeset 1189:50bf33dc8fe0 in freeDiameter for libfdcore
- Timestamp:
- Jun 10, 2013, 5:23:09 PM (11 years ago)
- Branch:
- default
- Phase:
- public
- Location:
- libfdcore
- Files:
-
- 4 edited
Legend:
- Unmodified
- Added
- Removed
-
libfdcore/config.c
r1184 r1189 59 59 fd_g_config->cnf_port_tls = DIAMETER_SECURE_PORT; 60 60 fd_g_config->cnf_sctp_str = 30; 61 fd_g_config->cnf_thr_srv = 5; 61 62 fd_g_config->cnf_dispthr = 4; 62 63 fd_list_init(&fd_g_config->cnf_endpoints, NULL); … … 98 99 } 99 100 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of SCTP streams . : %hu\n", fd_g_config->cnf_sctp_str), return NULL); 100 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of server threads : %hu\n", fd_g_config->cnf_dispthr), return NULL); 101 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of clients thr .. : %d\n", fd_g_config->cnf_thr_srv), return NULL); 102 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Number of app threads .. : %hu\n", fd_g_config->cnf_dispthr), return NULL); 101 103 if (FD_IS_LIST_EMPTY(&fd_g_config->cnf_endpoints)) { 102 104 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " Local endpoints ........ : Default (use all available)\n"), return NULL); -
libfdcore/fdd.l
r1187 r1189 256 256 (?i:"AppServThreads") { return APPSERVTHREADS;} 257 257 (?i:"ListenOn") { return LISTENON; } 258 (?i:"ThreadsPerServer") { return THRPERSRV; } 258 259 (?i:"TcTimer") { return TCTIMER; } 259 260 (?i:"TwTimer") { return TWTIMER; } -
libfdcore/fdd.y
r1180 r1189 109 109 %token APPSERVTHREADS 110 110 %token LISTENON 111 %token THRPERSRV 111 112 %token TCTIMER 112 113 %token TWTIMER … … 137 138 | conffile sctpstreams 138 139 | conffile listenon 140 | conffile thrpersrv 139 141 | conffile norelay 140 142 | conffile appservthreads … … 236 238 freeaddrinfo(ai); 237 239 free($3); 240 } 241 ; 242 243 thrpersrv: THRPERSRV '=' INTEGER ';' 244 { 245 CHECK_PARAMS_DO( ($3 > 0), 246 { yyerror (&yylloc, conf, "Invalid value"); YYERROR; } ); 247 conf->cnf_thr_srv = $3; 238 248 } 239 249 ; -
libfdcore/server.c
r1187 r1189 54 54 struct cnxctx * conn; /* server connection context (listening socket) */ 55 55 int proto; /* IPPROTO_TCP or IPPROTO_SCTP */ 56 int secur; /* TLS is started immediatly after connection ? 0: no; 2: yes (TLS/TCP or TLS/SCTP) */57 58 pthread_t thr; /* The thread listening for new connections*/56 int secur; /* TLS is started immediatly after connection ? 0: no; 1: RFU; 2: yes (TLS/TCP or TLS/SCTP) */ 57 58 pthread_t thr; /* The thread waiting for new connections (will store the data in the clients fifo) */ 59 59 enum s_state state; /* state of the thread */ 60 60 61 struct fd_list clients; /* List of clients connected to this server, not yet identified */ 62 pthread_mutex_t clients_mtx; /* Mutex to protect the list of clients */ 61 struct fifo *pending; /* FIFO of struct cnxctx */ 62 struct pool_workers { 63 struct server * s; /* pointer to the parent server structure */ 64 int id; /* The worker id for logs */ 65 pthread_t worker; /* The thread */ 66 } *workers; /* array of cnf_thr_srv items */ 63 67 }; 64 65 66 /* Client information (connecting peer for which we don't have the CER yet) */67 struct client {68 struct fd_list chain; /* link in the server's list of clients */69 struct cnxctx *conn; /* Parameters of the connection */70 struct timespec ts; /* Deadline for receiving CER (after INCNX_TIMEOUT) */71 pthread_t thr; /* connection state machine */72 };73 74 68 75 69 … … 92 86 93 87 88 /* dump one item of the server->pending fifo */ 89 static DECLARE_FD_DUMP_PROTOTYPE(dump_cnx, void * item) { 90 struct cnxctx * c = item; 91 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, " '%s'", fd_cnx_getid(c)), return NULL); 92 return *buf; 93 } 94 94 95 95 /* Dump all servers information */ 96 96 DECLARE_FD_DUMP_PROTOTYPE(fd_servers_dump, int details) 97 97 { 98 struct fd_list * li , *cli;98 struct fd_list * li; 99 99 100 100 FD_DUMP_HANDLE_OFFSET(); … … 113 113 "Thread status unknown"))), return NULL); 114 114 /* Dump the client list of this server */ 115 CHECK_POSIX_DO( pthread_mutex_lock(&s->clients_mtx), ); 116 for (cli = s->clients.next; cli != &s->clients; cli = cli->next) { 117 struct client * c = (struct client *)cli; 118 char bufts[128]; 119 CHECK_MALLOC_DO( fd_dump_extend( FD_DUMP_STD_PARAMS, "\n {client}(@%p)'%s': to:%s", c, fd_cnx_getid(c->conn), fd_log_time(&c->ts, bufts, sizeof(bufts))), break); 120 } 121 CHECK_POSIX_DO( pthread_mutex_unlock(&s->clients_mtx), ); 115 CHECK_MALLOC_DO( fd_fifo_dump(FD_DUMP_STD_PARAMS, "pending connections", s->pending, dump_cnx), return NULL ); 122 116 123 117 if (li->next != &FD_SERVERS) { … … 134 128 135 129 136 /* The state machine to handle incoming connection before the remote peer is identified */ 137 static void * client_sm(void * arg) 138 { 139 struct client * c = arg; 140 struct server * s = NULL; 130 /* The thread in the pool for handling new clients connecting to a server */ 131 static void * client_worker(void * arg) 132 { 133 struct pool_workers * pw = arg; 134 struct server * s = pw->s; 135 struct cnxctx * c = NULL; 136 int fatal = 0; 137 struct timespec ts; 141 138 struct fd_cnx_rcvdata rcv_data; 142 139 struct fd_msg_pmdl * pmdl = NULL; … … 145 142 struct fd_pei pei; 146 143 147 TRACE_ENTRY("%p", c); 148 149 memset(&rcv_data, 0, sizeof(rcv_data)); 150 151 CHECK_PARAMS_DO(c && c->conn && c->chain.head, goto fatal_error ); 152 153 154 s = c->chain.head->o; 155 156 /* Name the current thread */ 157 fd_log_threadname ( fd_cnx_getid(c->conn) ); 158 144 TRACE_ENTRY("%p", arg); 145 146 /* Set the thread name */ 147 { 148 char buf[48]; 149 snprintf(buf, sizeof(buf), "Worker#%d@%s", pw->id, fd_cnx_getid(s->conn)); 150 fd_log_threadname ( buf ); 151 } 152 153 /* Loop until canceled / error */ 154 next_client: 155 LOG_A("Ready to process next incoming connection"); 156 157 /* Get the next connection */ 158 CHECK_FCT_DO( fd_fifo_get( s->pending, &c ), { fatal = 1; goto cleanup; } ); 159 159 160 /* Handshake if we are a secure server port, or start clear otherwise */ 160 161 if (s->secur) { 161 int ret = fd_cnx_handshake(c->conn, GNUTLS_SERVER, (s->secur == 1) ? ALGO_HANDSHAKE_DEFAULT : ALGO_HANDSHAKE_3436, NULL, NULL); 162 LOG_D("Starting handshake with %s", fd_cnx_getid(c)); 163 164 int ret = fd_cnx_handshake(c, GNUTLS_SERVER, (s->secur == 1) ? ALGO_HANDSHAKE_DEFAULT : ALGO_HANDSHAKE_3436, NULL, NULL); 162 165 if (ret != 0) { 163 166 char buf[1024]; 164 snprintf(buf, sizeof(buf), "TLS handshake failed for c lient '%s', connection aborted.", fd_cnx_getid(c->conn));165 167 snprintf(buf, sizeof(buf), "TLS handshake failed for connection '%s', connection closed.", fd_cnx_getid(c)); 168 166 169 fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); 167 170 168 171 goto cleanup; 169 172 } 170 173 } else { 171 CHECK_FCT_DO( fd_cnx_start_clear(c ->conn, 0), goto cleanup );174 CHECK_FCT_DO( fd_cnx_start_clear(c, 0), goto cleanup ); 172 175 } 173 176 174 177 /* Set the timeout to receive the first message */ 175 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &c->ts), goto fatal_error ); 176 c->ts.tv_sec += INCNX_TIMEOUT; 178 CHECK_SYS_DO( clock_gettime(CLOCK_REALTIME, &ts), { fatal = 1; goto cleanup; } ); 179 ts.tv_sec += INCNX_TIMEOUT; 180 181 memset(&rcv_data, 0, sizeof(rcv_data)); 177 182 178 183 /* Receive the first Diameter message on the connection -- cleanup in case of timeout */ 179 CHECK_FCT_DO( fd_cnx_receive(c ->conn, &c->ts, &rcv_data.buffer, &rcv_data.length),184 CHECK_FCT_DO( fd_cnx_receive(c, &ts, &rcv_data.buffer, &rcv_data.length), 180 185 { 181 186 char buf[1024]; … … 183 188 switch (__ret__) { 184 189 case ETIMEDOUT: 185 snprintf(buf, sizeof(buf), "Client '%s' did not send CER within %ds, connection aborted.", fd_cnx_getid(c ->conn), INCNX_TIMEOUT);190 snprintf(buf, sizeof(buf), "Client '%s' did not send CER within %ds, connection aborted.", fd_cnx_getid(c), INCNX_TIMEOUT); 186 191 fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); 187 192 break; 188 193 189 194 case ENOTCONN: 190 snprintf(buf, sizeof(buf), "Connection from '%s' in error before CER was received.", fd_cnx_getid(c ->conn));195 snprintf(buf, sizeof(buf), "Connection from '%s' in error before CER was received.", fd_cnx_getid(c)); 191 196 fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); 192 197 break; 193 198 194 199 default: 195 snprintf(buf, sizeof(buf), "Connection from '%s': unspecified error, connection aborted.", fd_cnx_getid(c ->conn));200 snprintf(buf, sizeof(buf), "Connection from '%s': unspecified error, connection aborted.", fd_cnx_getid(c)); 196 201 fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); 197 202 } … … 199 204 } ); 200 205 201 TRACE_DEBUG(FULL, "Received %zdb from new client '%s'", rcv_data.length, fd_cnx_getid(c ->conn));206 TRACE_DEBUG(FULL, "Received %zdb from new client '%s'", rcv_data.length, fd_cnx_getid(c)); 202 207 203 208 pmdl = fd_msg_pmdl_get_inbuf(rcv_data.buffer, rcv_data.length); … … 212 217 /* Log incoming message */ 213 218 fd_hook_associate(msg, pmdl); 214 fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, NULL, fd_cnx_getid(c ->conn), fd_msg_pmdl_get(msg));219 fd_hook_call(HOOK_MESSAGE_RECEIVED, msg, NULL, fd_cnx_getid(c), fd_msg_pmdl_get(msg)); 215 220 216 221 /* We expect a CER, it must parse with our dictionary and rules */ … … 221 226 fd_hook_call(HOOK_MESSAGE_PARSING_ERROR, msg, NULL, pei.pei_message ?: pei.pei_errcode, fd_msg_pmdl_get(msg)); 222 227 223 snprintf(buf, sizeof(buf), "Error parsing CER from '%s', connection aborted.", fd_cnx_getid(c ->conn));228 snprintf(buf, sizeof(buf), "Error parsing CER from '%s', connection aborted.", fd_cnx_getid(c)); 224 229 fd_hook_call(HOOK_PEER_CONNECT_FAILED, NULL, NULL, buf, NULL); 225 230 … … 228 233 229 234 /* Now check we received a CER */ 230 CHECK_FCT_DO( fd_msg_hdr ( msg, &hdr ), goto fatal_error);235 CHECK_FCT_DO( fd_msg_hdr ( msg, &hdr ), { fatal = 1; goto cleanup; } ); 231 236 CHECK_PARAMS_DO( (hdr->msg_appl == 0) && (hdr->msg_flags & CMD_FLAG_REQUEST) && (hdr->msg_code == CC_CAPABILITIES_EXCHANGE), 232 237 { /* Parsing failed -- trace details */ 233 238 char buf[1024]; 234 snprintf(buf, sizeof(buf), "Expected CER from '%s', received a different message, connection aborted.", fd_cnx_getid(c ->conn));239 snprintf(buf, sizeof(buf), "Expected CER from '%s', received a different message, connection aborted.", fd_cnx_getid(c)); 235 240 fd_hook_call(HOOK_PEER_CONNECT_FAILED, msg, NULL, buf, NULL); 236 241 goto cleanup; 237 242 } ); 238 243 239 /* Finally, pass the information to the peers module which will handle it next*/240 pthread_cleanup_push((void *)fd_cnx_destroy, c ->conn);244 /* Finally, pass the information to the peers module which will handle it in a separate thread */ 245 pthread_cleanup_push((void *)fd_cnx_destroy, c); 241 246 pthread_cleanup_push((void *)fd_msg_free, msg); 242 CHECK_FCT_DO( fd_peer_handle_newCER( &msg, &c ->conn), );247 CHECK_FCT_DO( fd_peer_handle_newCER( &msg, &c ), ); 243 248 pthread_cleanup_pop(0); 244 249 pthread_cleanup_pop(0); 245 246 /* The end, we cleanup the client structure */ 250 247 251 cleanup: 248 /* Unlink the client structure */249 CHECK_POSIX_DO( pthread_mutex_lock(&s->clients_mtx), goto fatal_error );250 fd_list_unlink( &c->chain );251 CHECK_POSIX_DO( pthread_mutex_unlock(&s->clients_mtx), goto fatal_error );252 253 252 /* Cleanup the parsed message if any */ 254 253 if (msg) { 255 254 CHECK_FCT_DO( fd_msg_free(msg), /* continue */); 256 } 257 258 /* Destroy the connection object if present */ 259 if (c->conn) 260 fd_cnx_destroy(c->conn); 255 msg = NULL; 256 } 257 258 /* Close the connection if needed */ 259 if (c != NULL) { 260 fd_cnx_destroy(c); 261 c = NULL; 262 } 261 263 262 264 /* Cleanup the received buffer if any */ 263 265 free(rcv_data.buffer); 264 266 265 /* Detach the thread, cleanup the client structure */ 266 pthread_detach(pthread_self()); 267 free(c); 267 268 if (!fatal) 269 goto next_client; 270 271 LOG_E("Worker thread exiting."); 268 272 return NULL; 269 270 fatal_error: /* This has effect to terminate the daemon */ 271 CHECK_FCT_DO(fd_event_send(fd_g_config->cnf_main_ev, FDEV_TERMINATE, 0, NULL), ); 272 return NULL; 273 } 273 } 274 274 275 275 /* The thread managing a server */ … … 280 280 CHECK_PARAMS_DO(s, goto error); 281 281 fd_log_threadname ( fd_cnx_getid(s->conn) ); 282 282 283 set_status(s, RUNNING); 283 284 … … 286 287 287 288 do { 288 struct client * c = NULL;289 289 struct cnxctx * conn = NULL; 290 290 291 291 /* Wait for a new client or cancel */ 292 CHECK_MALLOC_DO( conn = fd_cnx_serv_accept(s->conn), goto error ); 293 294 /* Create a client structure */ 295 CHECK_MALLOC_DO( c = malloc(sizeof(struct client)), goto error ); 296 memset(c, 0, sizeof(struct client)); 297 fd_list_init(&c->chain, c); 298 c->conn = conn; 299 300 /* Save the client in the list */ 301 CHECK_POSIX_DO( pthread_mutex_lock( &s->clients_mtx ), goto error ); 302 fd_list_insert_before(&s->clients, &c->chain); 303 CHECK_POSIX_DO( pthread_mutex_unlock( &s->clients_mtx ), goto error ); 304 305 /* Start the client thread */ 306 CHECK_POSIX_DO( pthread_create( &c->thr, NULL, client_sm, c ), goto error ); 292 CHECK_MALLOC_DO( conn = fd_cnx_serv_accept(s->conn), break ); 293 294 /* Store this connection in the fifo for processing by the worker pool. Will block when the fifo is full */ 295 pthread_cleanup_push((void *)fd_cnx_destroy, conn); 296 CHECK_FCT_DO( fd_fifo_post( s->pending, &conn ), break ); 297 pthread_cleanup_pop(0); 307 298 308 299 } while (1); 309 310 300 error: 311 301 if (s) … … 324 314 { 325 315 struct server * new; 316 int i; 326 317 327 318 /* New server structure */ … … 332 323 new->proto = proto; 333 324 new->secur = secur; 334 CHECK_POSIX_DO( pthread_mutex_init(&new->clients_mtx, NULL), return NULL ); 335 fd_list_init(&new->clients, new); 325 326 CHECK_FCT_DO( fd_fifo_new(&new->pending, 5), return NULL); 327 CHECK_MALLOC_DO( new->workers = calloc( fd_g_config->cnf_thr_srv, sizeof(struct pool_workers) ), return NULL ); 328 329 for (i = 0; i < fd_g_config->cnf_thr_srv; i++) { 330 /* Create the pool */ 331 new->workers[i].s = new; 332 new->workers[i].id = i; 333 CHECK_POSIX_DO( pthread_create( &new->workers[i].worker, NULL, client_worker, &new->workers[i]), return NULL ); 334 } 336 335 337 336 return new; … … 479 478 while (!FD_IS_LIST_EMPTY(&FD_SERVERS)) { 480 479 struct server * s = (struct server *)(FD_SERVERS.next); 481 482 /* Lock client list now */ 483 CHECK_FCT_DO( pthread_mutex_lock(&s->clients_mtx), /* continue anyway */); 480 int i; 481 struct cnxctx * c; 484 482 485 483 /* cancel thread */ … … 489 487 fd_cnx_destroy(s->conn); 490 488 491 /* cancel and destroy all clients */ 492 while (!FD_IS_LIST_EMPTY(&s->clients)) { 493 struct client * c = (struct client *)(s->clients.next); 494 495 /* Destroy client's thread */ 496 CHECK_FCT_DO( fd_thr_term(&c->thr), /* continue */); 497 498 /* Destroy client's connection */ 499 fd_cnx_destroy(c->conn); 500 501 /* Unlink and free the client */ 502 fd_list_unlink(&c->chain); 503 free(c); 504 } 505 /* Unlock & destroy */ 506 CHECK_FCT_DO( pthread_mutex_unlock(&s->clients_mtx), /* continue anyway */); 507 CHECK_FCT_DO( pthread_mutex_destroy(&s->clients_mtx), /* continue */); 489 /* cancel and destroy all worker threads */ 490 for (i = 0; i < fd_g_config->cnf_thr_srv; i++) { 491 /* Destroy worker thread */ 492 CHECK_FCT_DO( fd_thr_term(&s->workers[i].worker), /* continue */); 493 } 494 free(s->workers); 495 496 /* Close any pending connection */ 497 while ( fd_fifo_tryget( s->pending, &c ) == 0 ) { 498 fd_cnx_destroy(c); 499 } 500 CHECK_FCT_DO( fd_fifo_del(&s->pending), ); 508 501 509 502 /* Now destroy the server object */
Note: See TracChangeset
for help on using the changeset viewer.