DTMF Stresstesting

import threading,time,serial,sys
from random import randrange
 
port_snd=14
port_recv=2
recnt=0
ser_snd=serial.Serial(port_snd,115200,timeout=0)
ser_recv=serial.Serial(port_recv,115200,timeout=0)
con = threading.Condition()
flag=0
recv_rdd=[]
snd_rdd=[]
sndset=['0','1','2','3','4','5','6','7','8','9','A','B','C','D','*','#']
filename='log_dtmg_'
fid=open(filename+time.strftime('%m_%d_%H_%M')+'.log','w')
 
 
def recv_rd():
      ss =ser_recv.readlines()
      for ln in ss:
            print ln+'                                         <-Recv: '
            fid.write(ln+' Length of rd is '+str(len(ss))+' ')
            fid.flush()
      return ss
def snd_rd():
      ss =ser_snd.readlines()
      for ln in ss:
            print ln+'                                        ->Snd'
            fid.write(ln+' Length of rd is'+str(len(ss))+' ')
            fid.flush()
      return ss 
 
 
#recv
def recv():
      global flag,sndc
      if con.acquire():
            while True:
                  if flag%2==1:
                        print str(flag)+' '+'Recv...active...'
                        fid.write(str(flag)+' '+'Recv...active... ')
                        recnt=0
                        while '+TONE:'+str(ord(sndc))+' ' not in recv_rd():
                              recnt+=1
                              time.sleep(0.05)
                              if recnt>20:
                                    print str(flag)+' '+'                                                     <-Recv failed!!'
                                    flag+=1
                                    con.notify()
                                    break
                        else:
                              print str(flag)+' '+'                                                            <-Recv Success!!'
                              flag+=1
                              con.notify()
                        print recnt
                  #time.sleep(2)           
                  con.wait()   
#send
def snd():
      global flag,sndc
      if con.acquire():
            while True:
                  sndc=sndset[randrange(16)]
                  if flag%2==0:
                        ser_snd.write('at+vts='+sndc+' ')
                        print str(flag)+' '+'Send...acttive....'
                        fid.write(str(flag)+' '+'Send...acttive.... ')
                        sndcnt=0
                        while 'OK ' not in snd_rd():
                              time.sleep(0.05)
                              sndcnt+=1
                              if sndcnt>20:
                                    print str(flag)+' '+'Send failed!!'                                
                                    flag+=1
                                    con.notify()
                                    break
                        else:
                                    flag+=1
                                    con.notify()
                        print sndcnt
                  #time.sleep(2)
                  con.wait()
 
 
print 'Main Thrd:: starting at: '+time.ctime()+' '
 
t_snd=threading.Thread(target=snd)
t_recv=threading.Thread(target=recv)                      
 
t_snd.start()
t_recv.start()
t_snd.join()
t_recv.join()
原文地址:https://www.cnblogs.com/eiguleo/p/3879106.html