Mercurial > hg > freeDiameter
view contrib/tools/csv_to_fd @ 1464:1404de313b85
csv_to_fd: consistent comparison order
author | Luke Mewburn <luke@mewburn.net> |
---|---|
date | Tue, 10 Mar 2020 09:33:41 +1100 |
parents | 8f6c77f24b1a |
children | 48fa8d70e6ad |
line wrap: on
line source
#!/usr/bin/env python """ Convert CSV files containing RADIUS or Diameter AVP tables into various formats. Format of the CSV files is one of: - Row per 3GPP AVP tables: Name, Code, Section, DataType, Must, May, ShouldNot, MustNot [, ...] - Name: AVP Name. String, validated as ALPHA *(ALPHA / DIGIT / "-") per RFC 6733 section 3.2. - Code: AVP Code. Integer, 0..4294967295. - Section: Section in relevant standard. String. - DataType: AVP Data Type. String, validated per basic and derived types in: - RFC 6733 section 4.2 - RFC 6733 section 4.3 - RFC 7155 section 4.1 - Must, May, ShouldNot, MustNot: Flags, possibly comma or space separated: M, V - Comment row. First cell: # Comment text Comment text #= Header row of ==== # Blank line - Parameter row: @Parameter,Value [, ...] Supported Parameter terms: standard Standard name. E.g. '3GPP TS 29.272', 'RFC 6733'. vendor Vendor number. """ from __future__ import print_function from __future__ import with_statement import abc import csv import collections import json import re import optparse import sys CSV_COLUMN_NAMES = [ 'name', 'code', 'section', 'datatype', 'must', 'may', 'shouldnot', 'mustnot', 'encrypt', ] DERIVED_TO_BASE = { 'Address': 'OctetString', # RFC 6733 section 4.3.1 'Time': 'OctetString', # RFC 6733 section 4.3.1 'UTF8String': 'OctetString', # RFC 6733 section 4.3.1 'DiameterIdentity': 'OctetString', # RFC 6733 section 4.3.1 'DiameterURI': 'OctetString', # RFC 6733 section 4.3.1 'Enumerated': 'Integer32', # RFC 6733 section 4.3.1 'IPFilterRule': 'OctetString', # RFC 6733 section 4.3.1 'QoSFilterRule': 'OctetString', # RFC 7155 section 4.1.1 } # See https://www.iana.org/assignments/enterprise-numbers/enterprise-numbers VENDOR_TO_NAME = { 0: '', 193: 'Ericsson', 8164: 'Starent', 10415: '3GPP', } class Avp(object): """Store an AVP row.""" # Regex to validate avp-name per RFC 6733 section 3.2, # with changes: # - Allow avp-name to start with numbers (for 3GPP) # - Allow '.' in avp-name, for existing dict_dcca_3gpp usage. # TODO: if starts with digit, ensure contains a letter somewhere? _name_re = re.compile(r'^[a-zA-Z0-9][a-zA-Z0-9-\.]*$') __slots__ = CSV_COLUMN_NAMES + [ 'filename', 'linenum', 'standard', 'vendor', ] def __init__(self, name, code, section, datatype, must, may, shouldnot, mustnot, encrypt, filename='', linenum=0, standard='', vendor=0): # Members from CSV row self.name = name self.code = int(code) self.section = section self.datatype = datatype self.must = must self.may = may self.shouldnot = shouldnot self.mustnot = mustnot self.encrypt = encrypt # Members from file state self.filename = filename self.linenum = linenum self.standard = standard self.vendor = vendor # Validate CSV fields if not self._name_re.match(self.name): raise ValueError('Invalid AVP name "{}"'.format(self.name)) if (self.code < 0 or self.code > 4294967295): raise ValueError('Invalid AVP code {}'.format(self.code)) if (self.datatype not in ( 'OctetString', 'Integer32', 'Integer64', 'Unsigned32', 'Unsigned64', 'Float32', 'Float64', 'Grouped') and self.datatype not in DERIVED_TO_BASE): raise ValueError('Invalid AVP data type "{}"'.format( self.datatype)) # TODO: validate must, may, shouldnot, mustnot @property def __dict__(self): return {s: getattr(self, s) for s in self.__slots__} class Processor(object): """Interface for processor of Avp""" __metaclass__ = abc.ABCMeta @classmethod def cls_name(cls): """Return the name, lower-case, without "processor" suffix.""" suffix = 'processor' name = cls.__name__.lower() if name.endswith(suffix): return name[:-len(suffix)] return name @classmethod def cls_desc(cls): """Return the first line of the docstring.""" if cls.__doc__ is None: return "" return cls.__doc__.split('\n')[0] @abc.abstractmethod def next_file(self, filename): """Called when a file is opened.""" pass @abc.abstractmethod def avp(self, avp): """Process a validated Avp.""" pass @abc.abstractmethod def comment(self, comment, filename, linenum): """Process a comment row: #comment, """ pass @abc.abstractmethod def generate(self): """Invoked after all rows processed.""" pass class DebugProcessor(Processor): """Display the CSV parsing""" def next_file(self, filename): print('File: {}'.format(filename)) def avp(self, avp): avpdict = vars(avp) print('AVP: {name}, {code}, {datatype}'.format(**avpdict)) def comment(self, comment, filename, linenum): print('Comment: {}'.format(comment)) def generate(self): print('Generate') class NoopProcessor(Processor): """Validate the CSV; no other output""" def next_file(self, filename): pass def avp(self, avp): pass def comment(self, comment, filename, linenum): pass def generate(self): pass class FdcProcessor(Processor): """Generate freeDiameter C code Comment cells are parsed as: # text comment /* text comment */ #= /*==============*/ # [blank line] """ COMMENT_WIDTH = 64 def __init__(self): self.lines = [] def next_file(self, filename): print('/* CSV file: {} */'.format(filename)) def avp(self, avp): comment = '{name}, {datatype}, code {code}'.format(**vars(avp)) if avp.section != '': comment += ', section {}'.format(avp.section) self.add_comment(comment) self.add('\t{') self.add('\t\tstruct dict_avp_data data = {') # TODO: remove comments? self.add('\t\t\t{},\t/* Code */'.format(avp.code)) self.add('\t\t\t{},\t/* Vendor */'.format(avp.vendor)) self.add('\t\t\t\"{}\",\t/* Name */'.format(avp.name)) self.add('\t\t\t{},\t/* Fixed flags */'.format( self.build_flags(', '.join([avp.must, avp.mustnot])))) self.add('\t\t\t{},\t/* Fixed flag values */'.format( self.build_flags(avp.must))) # TODO: add trailing comma? self.add('\t\t\tAVP_TYPE_{}\t/* base type of data */'.format( DERIVED_TO_BASE.get(avp.datatype, avp.datatype).upper())) self.add('\t\t};') avp_type = 'NULL' if avp.datatype == 'Enumerated': self.add('\t\tstruct dict_object\t*type;') vendor_prefix = '' if avp.vendor != 0: vendor_prefix = '{}/'.format(VENDOR_TO_NAME[avp.vendor]) self.add( '\t\tstruct dict_type_data\t tdata = {{ AVP_TYPE_INTEGER32, ' '"Enumerated({prefix}{name})", NULL, NULL, NULL }};'.format( prefix=vendor_prefix, name=avp.name)) # XXX: add enumerated values self.add('\t\tCHECK_dict_new(DICT_TYPE, &tdata, NULL, &type);') avp_type = "type" elif avp.datatype in DERIVED_TO_BASE: avp_type = '{}_type'.format(avp.datatype) self.add('\t\tCHECK_dict_new(DICT_AVP, &data, {}, NULL);'.format( avp_type)) # TODO: remove ; on scope brace self.add('\t};') self.add('') def comment(self, comment, filename, linenum): if comment == '': self.add('') elif comment == '=': self.add_header() elif comment.startswith(' '): self.add_comment(comment[1:]) else: raise ValueError('Unsupported comment "{}"'.format(comment)) def generate(self): self.print_header() self.print_comment('Start of generated data.') self.print_comment('') self.print_comment('The following is created automatically with:') self.print_comment(' csv_to_fd -p {}'.format(self.cls_name())) self.print_comment('Changes will be lost during the next update.') self.print_comment('Do not modify;' ' modify the source .csv file instead.') self.print_header() print('') print('\n'.join(self.lines)) self.print_header() self.print_comment('End of generated data.') self.print_header() def build_flags(self, flags): result = [] if 'V' in flags: result.append('AVP_FLAG_VENDOR') if 'M' in flags: result.append('AVP_FLAG_MANDATORY') return ' |'.join(result) def add(self, line): self.lines.append(line) def add_comment(self, comment): self.lines.append(self.format_comment(comment)) def add_header(self): self.lines.append(self.format_header()) def format_comment(self, comment): return '\t/* {:<{width}} */'.format(comment, width=self.COMMENT_WIDTH) def format_header(self): return '\t/*={:=<{width}}=*/'.format('', width=self.COMMENT_WIDTH) def print_comment(self, comment): print(self.format_comment(comment)) def print_header(self): print(self.format_header()) class JsonProcessor(Processor): """Generate freeDiameter JSON object """ def __init__(self): self.avps = [] def next_file(self, filename): pass def avp(self, avp): flags = collections.OrderedDict([ ('Must', self.build_flags(avp.must)), ('MustNot', self.build_flags(avp.mustnot)), ]) row = collections.OrderedDict([ ('Code', avp.code), ('Flags', flags), ('Name', avp.name), ('Type', avp.datatype), ('Vendor', avp.vendor), ]) self.avps.append(row) def comment(self, comment, filename, linenum): pass def generate(self): doc = {"AVPs": self.avps} print(json.dumps(doc, indent=2)) def build_flags(self, flags): result = [] if 'V' in flags: result.append('V') if 'M' in flags: result.append('M') return ''.join(result) def main(): # Build dict of name: NameProcessor processors = { cls.cls_name(): cls for cls in Processor.__subclasses__() } # Build Processor name to desc processor_help = '\n'.join( [' {:8} {}'.format(key, processors[key].cls_desc()) for key in sorted(processors)]) # Custom OptionParser with improved help class MyParser(optparse.OptionParser): """Custom OptionParser without epilog formatting.""" def format_help(self, formatter=None): return """\ {} Supported PROCESSOR options: {} """.format( optparse.OptionParser.format_help(self, formatter), processor_help) # Parse options parser = MyParser( description="""\ Convert CSV files containing RADIUS or Diameter AVP tables into various formats using the specified processor PROCESSOR. """) parser.add_option( '-p', '--processor', default='noop', help='AVP processor. One of: {}. [%default]'.format( ', '.join(processors.keys()))) (opts, args) = parser.parse_args() if len(args) < 1: parser.error('Incorrect number of arguments') # Find processor try: avpproc = processors[opts.processor]() except KeyError as e: parser.error('Unknown processor "{}"'.format(opts.processor)) # dict of [vendor][code] : Avp avp_codes = collections.defaultdict(dict) # Process files for filename in args: avpproc.next_file(filename) with open(filename, 'r') as csvfile: csvdata = csv.DictReader(csvfile, CSV_COLUMN_NAMES) linenum = 0 standard = '' vendor = 0 for row in csvdata: linenum += 1 try: if row['name'] in (None, '', 'Attribute Name'): continue elif row['name'].startswith('#'): comment = row['name'][1:] avpproc.comment(comment, filename, linenum) elif row['name'].startswith('@'): parameter = row['name'][1:] value = row['code'] if False: pass elif parameter == 'standard': standard = value elif parameter == 'vendor': vendor = int(value) else: raise ValueError('Unknown parameter "{}"'.format( parameter)) else: avp = Avp(filename=filename, linenum=linenum, standard=standard, vendor=vendor, **row) # Ensure AVP vendor/code not already defined if avp.code in avp_codes[avp.vendor]: conflict = avp_codes[avp.vendor][avp.code] raise ValueError( 'AVP vendor {} code {} already present' ' in file "{}" line {}'.format( avp.vendor, avp.code, conflict.filename, conflict.linenum)) avp_codes[avp.vendor][avp.code] = avp # Process AVP avpproc.avp(avp) except ValueError as e: sys.stderr.write('CSV file "{}" line {}: {}: {}\n'.format( filename, linenum, e.__class__.__name__, e)) sys.exit(1) # Generate result avpproc.generate() if __name__ == '__main__': main() # vim: set et sw=4 sts=4 :