Mercurial > hg > freeDiameter
view extensions/app_acct/acct_db.c @ 1249:4ad4d614acfa
Attempt to produce a CEA that is compliant with the ABNF when the Error bit is not set. Thanks Antonio Martinez for the report -- http://lists.freediameter.net/pipermail/dev/2013-December/000263.html
author | Sebastien Decugis <sdecugis@freediameter.net> |
---|---|
date | Mon, 16 Dec 2013 15:55:36 +0100 |
parents | 0b4abb03bcaf |
children |
line wrap: on
line source
/********************************************************************************************************* * Software License Agreement (BSD License) * * Author: Sebastien Decugis <sdecugis@freediameter.net> * * * * Copyright (c) 2013, WIDE Project and NICT * * All rights reserved. * * * * Redistribution and use of this software in source and binary forms, with or without modification, are * * permitted provided that the following conditions are met: * * * * * Redistributions of source code must retain the above * * copyright notice, this list of conditions and the * * following disclaimer. * * * * * Redistributions in binary form must reproduce the above * * copyright notice, this list of conditions and the * * following disclaimer in the documentation and/or other * * materials provided with the distribution. * * * * * Neither the name of the WIDE Project or NICT nor the * * names of its contributors may be used to endorse or * * promote products derived from this software without * * specific prior written permission of WIDE Project and * * NICT. * * * * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS "AS IS" AND ANY EXPRESS OR IMPLIED * * WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A * * PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR CONTRIBUTORS BE LIABLE FOR * * ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT * * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS * * INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR * * TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF * * ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. * *********************************************************************************************************/ /* Database interface module */ /* There is one connection to the db per thread. The connection is stored in the pthread_key_t variable */ #include "app_acct.h" #include <libpq-fe.h> const char * diam2db_types_mapping[AVP_TYPE_MAX + 1] = { "" /* AVP_TYPE_GROUPED */, "bytea" /* AVP_TYPE_OCTETSTRING */, "integer" /* AVP_TYPE_INTEGER32 */, "bigint" /* AVP_TYPE_INTEGER64 */, "integer" /* AVP_TYPE_UNSIGNED32 + cast */, "bigint" /* AVP_TYPE_UNSIGNED64 + cast */, "real" /* AVP_TYPE_FLOAT32 */, "double precision" /* AVP_TYPE_FLOAT64 */ }; static const char * stmt = "acct_db_stmt"; #ifndef TEST_DEBUG static #endif /* TEST_DEBUG */ pthread_key_t connk; static char * sql = NULL; /* The buffer that will contain the SQL query */ static int nbrecords = 0; /* Initialize the database context: connection to the DB, prepared statement to insert new records */ int acct_db_init(void) { struct acct_record_list emptyrecords; struct fd_list * li; size_t sql_allocd = 0; /* The malloc'd size of the buffer */ size_t sql_offset = 0; /* The actual data already written in this buffer */ int idx = 0; PGresult * res; PGconn *conn; #define REALLOC_SIZE 1024 /* We extend the buffer by this amount */ TRACE_ENTRY(); CHECK_PARAMS( acct_config && acct_config->conninfo && acct_config->tablename ); CHECK_PARAMS_DO( PQisthreadsafe() == 1, { fd_log_debug("You PostGreSQL installation is not thread-safe!"); return EINVAL; } ); /* Use the information from acct_config to create the connection and prepare the query */ conn = PQconnectdb(acct_config->conninfo); /* Check to see that the backend connection was successfully made */ if (PQstatus(conn) != CONNECTION_OK) { fd_log_debug("Connection to database failed: %s", PQerrorMessage(conn)); acct_db_free(); return EINVAL; } if (PQprotocolVersion(conn) < 3) { fd_log_debug("Database protocol version is too old, version 3 is required for prepared statements."); acct_db_free(); return EINVAL; } TRACE_DEBUG(FULL, "Connection to database successful, server version %d.", PQserverVersion(conn)); /* Now, prepare the request object */ /* First, we build the list of AVP we will insert in the database */ CHECK_FCT( acct_rec_prepare(&emptyrecords) ); /* Now, prepare the text of the request */ CHECK_MALLOC(sql = malloc(REALLOC_SIZE)); sql_allocd = REALLOC_SIZE; /* This macro hides the details of extending the buffer on each sprintf... */ #define ADD_EXTEND(args...) { \ size_t p; \ int loop = 0; \ do { \ p = snprintf(sql + sql_offset, sql_allocd - sql_offset, ##args); \ if (p >= sql_allocd - sql_offset) { \ /* Too short, extend the buffer and start again */ \ CHECK_MALLOC( sql = realloc(sql, sql_allocd + REALLOC_SIZE) ); \ sql_allocd += REALLOC_SIZE; \ loop++; \ ASSERT(loop < 100); /* detect infinite loops */ \ continue; \ } \ sql_offset += p; \ break; \ } while (1); \ } /* This macro allows to add a value in the SQL buffer while escaping in properly */ #define ADD_ESCAPE(str) { \ char * __s = (char *)str; \ /* Check we have at least twice the size available +1 */ \ size_t p = strlen(__s); \ \ while (sql_allocd - sql_offset < 2 * p + 1) { \ /* Too short, extend the buffer */ \ CHECK_MALLOC( sql = realloc(sql, sql_allocd + REALLOC_SIZE) ); \ sql_allocd += REALLOC_SIZE; \ } \ \ /* Now add the escaped string */ \ p = PQescapeStringConn(conn, sql+sql_offset, __s, p, NULL); \ sql_offset += p; \ } /* INSERT INTO table (tsfield, field1, field2, ...) VALUES (now, $1::bytea, $2::integer, ...) */ ADD_EXTEND("INSERT INTO %s (", acct_config->tablename); if (acct_config->tsfield) { ADD_EXTEND("\""); ADD_ESCAPE(acct_config->tsfield); ADD_EXTEND("\", "); } if (acct_config->srvnfield) { ADD_EXTEND("\""); ADD_ESCAPE(acct_config->srvnfield); ADD_EXTEND("\", "); } for (li = emptyrecords.all.next; li != &emptyrecords.all; li = li->next) { struct acct_record_item * i = (struct acct_record_item *)(li->o); ADD_EXTEND("\""); ADD_ESCAPE(i->param->field?:i->param->avpname); if (i->index) { ADD_EXTEND("%d", i->index); } if (li->next != &emptyrecords.all) { ADD_EXTEND("\", "); } } ADD_EXTEND("\") VALUES ("); if (acct_config->tsfield) { ++idx; ADD_EXTEND("$%d, ", idx); } if (acct_config->srvnfield) { ADD_EXTEND("'"); ADD_ESCAPE(fd_g_config->cnf_diamid); ADD_EXTEND("', "); } for (li = emptyrecords.all.next; li != &emptyrecords.all; li = li->next) { struct acct_record_item * i = (struct acct_record_item *)(li->o); ++idx; ADD_EXTEND("$%d::%s", idx, diam2db_types_mapping[i->param->avptype]); if (li->next != &emptyrecords.all) { ADD_EXTEND(", "); } } ADD_EXTEND(");"); TRACE_DEBUG(FULL, "Preparing the following SQL statement: '%s'", sql); res = PQprepare(conn, stmt, sql, emptyrecords.nball, NULL); if (PQresultStatus(res) != PGRES_COMMAND_OK) { TRACE_DEBUG(INFO, "Preparing statement '%s' failed: %s", sql, PQerrorMessage(conn)); PQclear(res); return EINVAL; } PQclear(res); nbrecords = emptyrecords.nball; acct_rec_empty(&emptyrecords); CHECK_POSIX( pthread_key_create(&connk, (void (*)(void*))PQfinish) ); CHECK_POSIX( pthread_setspecific(connk, conn) ); /* Ok, ready */ return 0; } /* Terminate the connection to the DB */ void acct_db_free(void) { CHECK_POSIX_DO(pthread_key_delete(connk) , ); free(sql); } /* When a new message has been received, insert the content of the parsed mapping into the DB (using prepared statement) */ int acct_db_insert(struct acct_record_list * records) { char **val; int *val_len; int *val_isbin; int idx = 0; int size = 0; PGresult *res; struct fd_list *li; PGconn *conn; int new = 0; TRACE_ENTRY("%p", records); CHECK_PARAMS( records ); conn = pthread_getspecific(connk); if (!conn) { conn = PQconnectdb(acct_config->conninfo); CHECK_POSIX( pthread_setspecific(connk, conn) ); new = 1; } /* First, check if the connection with the DB has not staled, and eventually try to fix it */ if (PQstatus(conn) != CONNECTION_OK) { /* Attempt a reset */ PQreset(conn); if (PQstatus(conn) != CONNECTION_OK) { TRACE_DEBUG(INFO, "Lost connection to the database server, and attempt to reestablish it failed"); TODO("Terminate the freeDiameter instance completely?"); return ENOTCONN; } } if (new) { /* Create the prepared statement for this ocnnection, it is not shared */ res = PQprepare(conn, stmt, sql, nbrecords, NULL); if (PQresultStatus(res) != PGRES_COMMAND_OK) { TRACE_DEBUG(INFO, "Preparing statement '%s' failed: %s", sql, PQerrorMessage(conn)); PQclear(res); return EINVAL; } PQclear(res); } size = acct_config->tsfield ? records->nball + 1 : records->nball; /* Alloc the arrays of parameters */ CHECK_MALLOC( val = calloc(size, sizeof(const char *)) ); CHECK_MALLOC( val_len = calloc(size, sizeof(const int)) ); CHECK_MALLOC( val_isbin = calloc(size, sizeof(const int)) ); if (acct_config->tsfield) { val[idx] = "now"; val_len[idx] = 3; val_isbin[idx] = 0; idx++; } /* Now write all the map'd records in these arrays */ for (li = records->all.next; li != &records->all; li = li->next) { struct acct_record_item * r = (struct acct_record_item *)(li->o); if (r->value) { val_isbin[idx] = 1; /* We always pass binary parameters */ switch (r->param->avptype) { case AVP_TYPE_OCTETSTRING: val[idx] = (void *)(r->value->os.data); val_len[idx] = r->value->os.len; break; case AVP_TYPE_INTEGER32: case AVP_TYPE_UNSIGNED32: case AVP_TYPE_FLOAT32: r->scalar.v32 = htonl(r->value->u32); val[idx] = &r->scalar.c; val_len[idx] = sizeof(uint32_t); break; case AVP_TYPE_INTEGER64: case AVP_TYPE_UNSIGNED64: case AVP_TYPE_FLOAT64: r->scalar.v64 = htonll(r->value->u64); val[idx] = &r->scalar.c; val_len[idx] = sizeof(uint64_t); break; default: ASSERT(0); /* detect bugs */ } } idx++; } /* OK, now execute the SQL statement */ res = PQexecPrepared(conn, stmt, size, (const char * const *)val, val_len, val_isbin, 1 /* We actually don't care here */); /* Done with the parameters */ free(val); free(val_len); free(val_isbin); /* Now check the result code */ if (PQresultStatus(res) != PGRES_COMMAND_OK) { TRACE_DEBUG(INFO, "An error occurred while INSERTing in the database: %s", PQerrorMessage(conn)); PQclear(res); return EINVAL; /* It was probably a mistake in configuration file... */ } PQclear(res); /* Ok, we are done */ return 0; }