Mercurial > hg > fD-testbed
view conf/nas.a.rt.testbed.aaa/freeDiameter/python_.py @ 14:fef572dfbd43
Tested new netemul & send delay
author | Sebastien Decugis <sdecugis@nict.go.jp> |
---|---|
date | Tue, 11 Jan 2011 16:08:18 +0900 |
parents | |
children |
line wrap: on
line source
gdict = cvar.fd_g_config.cnf_dict d_si = gdict.search ( DICT_AVP, AVP_BY_NAME, "Session-Id" ) d_oh = gdict.search ( DICT_AVP, AVP_BY_NAME, "Origin-Host" ) d_or = gdict.search ( DICT_AVP, AVP_BY_NAME, "Origin-Realm" ) d_dh = gdict.search ( DICT_AVP, AVP_BY_NAME, "Destination-Host" ) d_dr = gdict.search ( DICT_AVP, AVP_BY_NAME, "Destination-Realm" ) d_rc = gdict.search ( DICT_AVP, AVP_BY_NAME, "Result-Code" ) d_vnd = gdict.new_obj(DICT_VENDOR, dict_vendor_data(999999, "app_test_py vendor") ) d_app = gdict.new_obj(DICT_APPLICATION, dict_application_data(0xffffff, "app_test_py appli"), d_vnd) d_req = gdict.new_obj(DICT_COMMAND, dict_cmd_data(0xfffffe, "Test_py-Request", 1), d_app) d_ans = gdict.new_obj(DICT_COMMAND, dict_cmd_data(0xfffffe, "Test_py-Answer", 0), d_app) d_avp = gdict.new_obj(DICT_AVP, dict_avp_data(0xffffff, "app_test_py avp", AVP_TYPE_INTEGER32, 999999 )) gdict.new_obj(DICT_RULE, dict_rule_data(d_si, RULE_FIXED_HEAD, 1, 1), d_req) gdict.new_obj(DICT_RULE, dict_rule_data(d_si, RULE_FIXED_HEAD, 1, 1), d_ans) gdict.new_obj(DICT_RULE, dict_rule_data(d_avp, RULE_REQUIRED, 1, 1), d_req) gdict.new_obj(DICT_RULE, dict_rule_data(d_avp, RULE_REQUIRED, 1, 1), d_ans) gdict.new_obj(DICT_RULE, dict_rule_data(d_oh, RULE_REQUIRED, 1, 1), d_req) gdict.new_obj(DICT_RULE, dict_rule_data(d_oh, RULE_REQUIRED, 1, 1), d_ans) gdict.new_obj(DICT_RULE, dict_rule_data(d_or, RULE_REQUIRED, 1, 1), d_req) gdict.new_obj(DICT_RULE, dict_rule_data(d_or, RULE_REQUIRED, 1, 1), d_ans) gdict.new_obj(DICT_RULE, dict_rule_data(d_dr, RULE_REQUIRED, 1, 1), d_req) gdict.new_obj(DICT_RULE, dict_rule_data(d_dh, RULE_OPTIONAL, 0, 1), d_req) gdict.new_obj(DICT_RULE, dict_rule_data(d_rc, RULE_REQUIRED, 1, 1), d_ans) def receive_answer(ans, testval): if (ans.is_request()): print "Request was expired before receiving answer..." return None try: tval = ans.search(d_avp).header().avp_value.u32 except: print "Error in receive_answer: no Test-AVP included" tval = 0 try: print "Py RECV %x (expected: %x) Status: %d From: '%s'" % (tval, testval, ans.search(d_rc).header().avp_value.u32, ans.search(d_oh).header().avp_value.os.as_str()) except: print "Error in receive_answer: Result-Code or Origin-Host are missing" del ans return None import random def send_query(destrealm="b.rt.testbed.aaa"): qry = msg(d_req) sess = session() tv = random.randint(1, 1<<32) # Session-Id a = avp(d_si, AVPFL_SET_BLANK_VALUE) a.header().avp_value.os = sess.getsid() qry.add_child(a) # Destination-Realm a = avp(d_dr, AVPFL_SET_BLANK_VALUE) a.header().avp_value.os = destrealm qry.add_child(a) # Origin-Host, Origin-Realm qry.add_origin() # Test-AVP a = avp(d_avp, AVPFL_SET_BLANK_VALUE) a.header().avp_value.u32 = tv qry.add_child(a) print "Py SEND %x to '%s'" % (tv, destrealm) qry.send(receive_answer, tv, 1) send_query()