view conf/nas.a.rt.testbed.aaa/freeDiameter/python_.py @ 14:fef572dfbd43

Tested new netemul & send delay
author Sebastien Decugis <sdecugis@nict.go.jp>
date Tue, 11 Jan 2011 16:08:18 +0900
parents
children
line wrap: on
line source

gdict = cvar.fd_g_config.cnf_dict
d_si = gdict.search ( DICT_AVP, AVP_BY_NAME, "Session-Id" )
d_oh  = gdict.search ( DICT_AVP, AVP_BY_NAME, "Origin-Host" )
d_or  = gdict.search ( DICT_AVP, AVP_BY_NAME, "Origin-Realm" )
d_dh  = gdict.search ( DICT_AVP, AVP_BY_NAME, "Destination-Host" )
d_dr  = gdict.search ( DICT_AVP, AVP_BY_NAME, "Destination-Realm" )
d_rc  = gdict.search ( DICT_AVP, AVP_BY_NAME, "Result-Code" )
d_vnd = gdict.new_obj(DICT_VENDOR, 	dict_vendor_data(999999, 	"app_test_py vendor") )
d_app = gdict.new_obj(DICT_APPLICATION, dict_application_data(0xffffff, "app_test_py appli"), d_vnd)
d_req = gdict.new_obj(DICT_COMMAND, 	dict_cmd_data(0xfffffe, "Test_py-Request", 1), d_app)
d_ans = gdict.new_obj(DICT_COMMAND, 	dict_cmd_data(0xfffffe, "Test_py-Answer",  0), d_app)
d_avp = gdict.new_obj(DICT_AVP, 	dict_avp_data(0xffffff, "app_test_py avp", AVP_TYPE_INTEGER32, 999999 ))
gdict.new_obj(DICT_RULE, dict_rule_data(d_si, RULE_FIXED_HEAD, 1, 1), d_req)
gdict.new_obj(DICT_RULE, dict_rule_data(d_si, RULE_FIXED_HEAD, 1, 1), d_ans)
gdict.new_obj(DICT_RULE, dict_rule_data(d_avp, RULE_REQUIRED, 1, 1), d_req)
gdict.new_obj(DICT_RULE, dict_rule_data(d_avp, RULE_REQUIRED, 1, 1), d_ans)
gdict.new_obj(DICT_RULE, dict_rule_data(d_oh, RULE_REQUIRED, 1, 1), d_req)
gdict.new_obj(DICT_RULE, dict_rule_data(d_oh, RULE_REQUIRED, 1, 1), d_ans)
gdict.new_obj(DICT_RULE, dict_rule_data(d_or, RULE_REQUIRED, 1, 1), d_req)
gdict.new_obj(DICT_RULE, dict_rule_data(d_or, RULE_REQUIRED, 1, 1), d_ans)
gdict.new_obj(DICT_RULE, dict_rule_data(d_dr, RULE_REQUIRED, 1, 1), d_req)
gdict.new_obj(DICT_RULE, dict_rule_data(d_dh, RULE_OPTIONAL, 0, 1), d_req)
gdict.new_obj(DICT_RULE, dict_rule_data(d_rc, RULE_REQUIRED, 1, 1), d_ans)

def receive_answer(ans, testval):
   if (ans.is_request()):
     print "Request was expired before receiving answer..."
     return None
   try:
     tval = ans.search(d_avp).header().avp_value.u32
   except:
     print "Error in receive_answer: no Test-AVP included"
     tval = 0
   try:
     print "Py RECV %x (expected: %x) Status: %d From: '%s'" % (tval, testval, ans.search(d_rc).header().avp_value.u32, ans.search(d_oh).header().avp_value.os.as_str())
   except:
     print "Error in receive_answer: Result-Code or Origin-Host are missing"
   del ans
   return None

import random

def send_query(destrealm="b.rt.testbed.aaa"):
   qry = msg(d_req)
   sess = session()
   tv = random.randint(1, 1<<32)
   # Session-Id
   a = avp(d_si, AVPFL_SET_BLANK_VALUE)
   a.header().avp_value.os = sess.getsid()
   qry.add_child(a)
   # Destination-Realm
   a = avp(d_dr, AVPFL_SET_BLANK_VALUE)
   a.header().avp_value.os = destrealm
   qry.add_child(a)
   # Origin-Host, Origin-Realm
   qry.add_origin()
   # Test-AVP
   a = avp(d_avp, AVPFL_SET_BLANK_VALUE)
   a.header().avp_value.u32 = tv
   qry.add_child(a)
   print "Py SEND %x to '%s'" % (tv, destrealm)
   qry.send(receive_answer, tv, 1)

send_query()


"Welcome to our mercurial repository"