Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
f2b7090
modify analyzer.py indent and add ssl to synchronizer
kun22kun May 30, 2015
958c773
modify indent of analyzer
kun22kun May 30, 2015
671b0d2
combine to into two python files
kun22kun May 30, 2015
cdf3f5e
code review
Jun 1, 2015
60439de
add scp, remove netcat
Jun 2, 2015
66d775a
code review
Jun 2, 2015
ad0f4c9
add some prompts
Jun 8, 2015
2d9d725
redirect output to log, rm certificate and private key file of ssl af…
Jun 8, 2015
1b7efbe
rm xml read
Jun 12, 2015
64e64b2
rm writing xml file
Jun 13, 2015
5b8cc34
rm sending timestamp, offset and receiving
Jun 13, 2015
88dd06d
add md5 test
Jun 17, 2015
bb54999
create syn_log if it doesn't exist
Jun 18, 2015
a48d0eb
modify submit file
Jun 18, 2015
5949df1
use dagman to submit several similar jobs
Jun 24, 2015
32dbf75
deal when no update
kun22kun Jun 25, 2015
997e9ac
Merge branch 'develop' of github.com:iDPLcn/idpl_parser into develop
kun22kun Jun 25, 2015
6647f84
rm xmlHandler, fix the bug of client md5 receive timeout
Jun 26, 2015
6c5ce9c
revise readme
Jul 5, 2015
cc9c3af
add the dag cron file
Jul 20, 2015
a2de14d
move regular expression set to the submit file
Jul 20, 2015
07e9686
add reg_exp variable in submit file
Jul 21, 2015
eab52fe
modify 'usage()'
Jul 21, 2015
44216f2
modify openssl cmd
Jul 21, 2015
b33a71a
repair debug of reg_exp addition
Jul 22, 2015
af6f180
revise default reg_exp
kun22kun Jul 22, 2015
01364d9
add new shell and modify regular input way
kun22kun Jul 29, 2015
5a2ad1a
datasize units of tools
kun22kun Jul 29, 2015
e783c45
repair syntax error
kun22kun Jul 29, 2015
47c7c5c
add schedule
kun22kun Jul 29, 2015
f6746e9
add synchronization schedule
kun22kun Jul 30, 2015
4a5a61a
modify analysis schedule
kun22kun Jul 30, 2015
79387a3
modify schedules of analysis and synchronization
kun22kun Jul 30, 2015
ea19b04
add fdt and modifty schedule of analysis
kun22kun Jul 30, 2015
96477dd
modify schedule of synchronization
kun22kun Jul 31, 2015
30ea3c9
modify the offset due to new format of post shell command which adds …
kun22kun Aug 3, 2015
3a89066
add schedule.sh
kun22kun Aug 3, 2015
c26d3a9
fix name conflict between sys var and my var
kun22kun Aug 4, 2015
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 35 additions & 40 deletions analyzer/analyzer.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,41 +2,48 @@
import re

class Analyzer:
#regOfIperf = "'iperf.*'"
def match(self, reg, strToMatch, result):
pattern = re.compile(reg)
match = pattern.search(strToMatch)
if match:
result = match.group()[1:(len(result) - 1)]
#print("bingo! " + result)
return True, result
return False,result
""" use regular expression to judge if a line matches """
def match(self, reg, strToMatch, result):
pattern = re.compile(reg)
match = pattern.search(strToMatch)
if match:
result = match.group()[1:-1]
return True, result
return False,result

def combi(self, strToCombi, tool):
""" combine bandwidth into a line """
def combi(self, strToCombi, tool):
strArray = strToCombi.split(',')[1:]
if tool == "netcat":
datasize = strArray[len(strArray) - 1]
strArray[len(strArray) - 1] = str(float(datasize) / 1024)
if (not self.deal(strArray)):
return False, ''
#print(strArray)
seq = ' '
strToCombi = seq.join(strArray)
#print(strToCombi)

""" transform the unit of datasize in scp from B to KB """
if tool != "iperf":
datasize = strArray[-1]
strArray[-1] = str(float(datasize) / 1024)

""" append bandwidth and remove the point whose bandwidth is 0 """
if (not self.deal(strArray, tool)):
return [False, '']

strToCombi = ' '.join(strArray)
return [True, strToCombi]

def deal(self, strArray):
bandwidth = float('%0.2f'%((float(strArray[len(strArray) - 1]) * 1024 * 8) / float(strArray[len(strArray) - 2])))
if (abs(bandwidth) <= 0.000001):
return False
strArray.append(str(bandwidth))
return True
#print(bandwidth)
""" compute and append bandwidth """
def deal(self, strArray, tool):
bandwidth = float('%0.2f'%((float(strArray[-1]) * 1024 * 8) / float(strArray[-2])))
""" remove the point whose bandwidth is 0 """
if (abs(bandwidth) <= 0.000001):
return False

def analyze(self, strToMatch, tools):
strArray.append(str(bandwidth))
strArray.append(tool)
return True

""" analyze a line """
def analyze(self, strToMatch, tools, reg_prefix):
result = ''
for tool in tools:
reg = "'" + tool + ".*'"
reg = "'" + reg_prefix + tool + ".*'"
matchResult = self.match(reg, strToMatch, result)
if matchResult[0]:
resultSet = self.combi(matchResult[1], tool)
Expand All @@ -46,15 +53,3 @@ def analyze(self, strToMatch, tools):
resultSet = [False, result, tool]
return resultSet




'''
file = open("D:\\Homework\\GraduationProject\\analyzer\\placement.log")
for line in file:
print(line)
file.close

analyzer = Analyzer()
analyzer.analyze()
'''
76 changes: 48 additions & 28 deletions analyzer/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,70 +6,90 @@
import sys,getopt

class Client:
#uriLog = '/home/idpl/results/idpl.cnic/b2c.log'
#uriTime = '/home/kunq/DataAccessor0.12/timeRead.txt'
#uriLog = '/home/kun/GraduationProject/b2c.log'
#uriTime = '/home/kun/GraduationProject/DataAccessor0.12/timeRead.txt'
uriLog = ""
uriTime = ""
shellPath = ""
sorceFile = []
def __init__(self):
self.uriLog = ""
self.uriTime = ""
self.shellPath = ""
self.reg_prefix = "undefined"
self.sourceFile = []

def usage(self):
print("client.py -l <log file> -t <timeRead file> -s <shell path> [-r <regular>]")

def getOptions(self):
opts, args = getopt.getopt(sys.argv[1:], "hl:t:s:", ["help", "log=", "timeStamp=", "shellScript="])

if len(sys.argv) < 7:
self.usage()
sys.exit()

opts, args = getopt.getopt(sys.argv[1:], "hl:t:s:r:", ["help", "log=", "timeStamp=", "shellScript=", "regular="])
for op,value in opts:
if op in ("-h", "--help"):
print("client.py -l <log file> -t <timeRead file> -s <shell path>")
sys.exit(1)
self.usage()
sys.exit()
elif op in ("-l", "--log"):
self.uriLog = value
elif op in ("-t", "--timeStamp"):
self.uriTime = value
elif op in ("-s", "--shellScript"):
self.shellPath = value
elif op in ("-r", "--regular"):
self.reg_prefix = value
if self.reg_prefix == "undefined":
self.reg_prefix = ".*writerecord:"
elif self.reg_prefix == "NULL":
self.reg_prefix = ""

""" read the log rotated """
def readLog(self, uri):
#TODO
with open(uri) as self.sourceFile:
return self.sourceFile.readlines(), True

def closeFile(self):
self.sourceFile.close

def combi(self, result, reg):
return self.shellPath + "post_" + reg + "_time.sh" + " " + result + " USERNAME=username PASSWORD=password HOSTNAME=hostname:port"

""" choose the corresponding shell to insert into database """
def combi(self, result, tool):
return self.shellPath + "post_measuredata.sh " + result + " USERNAME=username PASSWORD=password HOSTNAME=hostname:port"

""" insert data into database """
def excuteShell(self, result):
output = os.popen(result)
#print(output.read())

""" check the data if inserted
if timestmap in the line analyzed now less than timestamp last, indicate the data is inserted
else if timestamps are equal, if tool is equal, indicate the data is inserted
else the data has not inserted"""
def check(self, result, timeR, offset):
resultArray = result[1].split(' ')
if(float(resultArray[len(resultArray) - offset]) < float(timeR[1])):
if(float(resultArray[-offset]) < float(timeR[1])):
return True
elif(abs(float(resultArray[len(resultArray) - offset]) - float(timeR[1])) < 0.000001):
elif(abs(float(resultArray[-offset]) - float(timeR[1])) < 0.000001):
return result[2] == timeR[0]
return False

def splitStr(self, string, char, offset):
stringArray = string.split(char)
return stringArray[len(stringArray) - offset]

return stringArray[-offset]

""" get the timestamp last read to """
def readTimeRead(self, uri):
try:
timeReadFile = open(uri)
except:
print "file not exists!"
print "timestamp file not exists!"
timeRead = timeReadFile.read().strip("\n")
timeReadFile.close
return timeRead

def main(self, analyzer):

offset = 6
offset = 7
isFinished = False
isNewTime = True
self.getOptions()
tools = ["iperf", "netcat"]
tools = ["iperf", "scp", "netcat", "fdt"]

if not os.path.exists(self.uriTime):
print('WARN! Create a timeRead file!')
Expand All @@ -79,12 +99,13 @@ def main(self, analyzer):
timeR = timeRead.split(",")
timeRNew = timeRead

""" analyze log """
while(not isFinished):
#TODO read log rotated
fileLines, isFinished = self.readLog(self.uriLog)

for i in range (len(fileLines) - 1, 0, -1):
#print(line)
result = analyzer.analyze(fileLines[i], tools)
for line in fileLines[::-1]:
result = analyzer.analyze(line, tools, self.reg_prefix)
if result[0]:
if self.check(result, timeR, offset):
isFinished = True
Expand All @@ -94,12 +115,11 @@ def main(self, analyzer):
timestampNew = self.splitStr(result[1], ' ', offset)
timeRNew = result[2] + "," + timestampNew
isNewTime = False
print self.combi(result[1], result[2])
#self.excuteShell(self.combi(result[1], result[2]))
#print self.combi(result[1], result[2])
self.excuteShell(self.combi(result[1], result[2]))
self.closeFile()

with open(self.uriTime, 'w') as timeReadFile:
#print(timeRNew)
timeReadFile.write(timeRNew)


Expand Down
23 changes: 23 additions & 0 deletions analyzer/post_measuredata.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
#!/bin/sh

USERNAME="idpl"
PASSWORD="idpl"
HOSTNAME="localhost:8000"
API_URI="/condor/measurementdata/"

SENDER=$1
RECEIVER=$2
TIME_START=$3
TIME_END=$4
CHECKSUM_EQUAL=$5
DURATION=$6
DATA_SIZE=$7
BANDWIDTH=$8
MEASUREMENT=$9

shift 9
export "$@"

API_URL=http://$HOSTNAME$API_URI

curl -u $USERNAME:$PASSWORD -H "Content-Type: application/json" -d "{\"source\": \"$SENDER\", \"destination\": \"$RECEIVER\", \"time_start\": $TIME_START, \"time_end\": $TIME_END, \"md5_equal\": $CHECKSUM_EQUAL, \"duration\": $DURATION, \"data_size\": $DATA_SIZE, \"bandwidth\": $BANDWIDTH, \"measurement\": \"$MEASUREMENT\"}" $API_URL
6 changes: 5 additions & 1 deletion analyzer/readme
Original file line number Diff line number Diff line change
@@ -1 +1,5 @@
scripts to synchronize data on the other nodes
scripts to analyze and deal with data

unit of datasize:
iperf KB
netcat, scp, fdt B
25 changes: 25 additions & 0 deletions analyzer/schedule
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
International
buaa 11:55:00 23:55:00
buaa2wisc
buaa2cnic
buaa2ucsd
wisc 00:05:00 12:05:00
wisc2buaa
wisc2cnic
wisc2ucsd
ucsd 00:05:00 12:05:00
ucsd2buaa
ucsd2cnic
ucsd2wisc
cnic #TODO

Domestic
buaa 11:55:00 23:55:00
buaa2cnic
buaa2yunnan
yunnan 00:05:00 12:05:00
yunnan2buaa
yunnan2cnic
cnic 00:05:00 00:05:00
cnic2buaa
cnic2yunnan
35 changes: 35 additions & 0 deletions analyzer/schedule.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,35 @@
#!/bin/sh
#buaa
buaa2ucsd="/home/zwzhang/placement4/buaa2ucsd/placement4.log /home/idpl/results/buaa2ucsd/timeRead /home/kunq/logs_analyzer/buaa/buaa2ucsd_out.log"
buaa2cnic="/home/zwzhang/placement4/buaa2cnic/placement4.log /home/idpl/results/buaa2cnic/timeRead /home/kunq/logs_analyzer/buaa/buaa2cnic_out.log"
buaa2wisc="/home/zwzhang/placement4/buaa2wisc/placement4.log /home/idpl/results/buaa2wisc/timeRead /home/kunq/logs_analyzer/buaa/buaa2wisc_out.log"

#wisc
wisc2buaa="/home/idpl/results/wisc2buaa/placement4.log /home/idpl/results/wisc2buaa/timeRead /home/kunq/logs_analyzer/wisc/wisc2buaa_out.log"
wisc2ucsd="/home/idpl/results/wisc2ucsd/placement4.log /home/idpl/results/wisc2ucsd/timeRead /home/kunq/logs_analyzer/wisc/wisc2ucsd_out.log"
wisc2cnic="/home/idpl/results/wisc2cnic/placement4.log /home/idpl/results/wisc2cnic/timeRead /home/kunq/logs_analyzer/wisc/wisc2cnic_out.log"
wisc2calit2="/home/idpl/results/wisc2calit2/placement4.log /home/idpl/results/wisc2calit2/timeRead /home/kunq/logs_analyzer/wisc/wisc2calit2_out.log"

#ucsd
ucsd2buaa="/home/idpl/results/ucsd2buaa/placement4.log /home/idpl/results/ucsd2buaa/timeRead /home/kunq/logs_analyzer/ucsd/ucsd2buaa_out.log"
ucsd2cnic="/home/idpl/results/ucsd2cnic/placement4.log /home/idpl/results/ucsd2cnic/timeRead /home/kunq/logs_analyzer/ucsd/ucsd2cnic_out.log"
ucsd2wisc="/home/idpl/results/ucsd2wisc/placement4.log /home/idpl/results/ucsd2wisc/timeRead /home/kunq/logs_analyzer/ucsd/ucsd2wisc_out.log"
physics2calit2="/home/idpl/results/physics2calit2/placement4.log /home/idpl/results/physics2calit2/timeRead /home/kunq/logs_analyzer/physics/physics2calit2_out.log"

#calit2
calit2physics="/home/idpl/results/calit2physics/placement4.log /home/idpl/results/calit2physics/timeRead /home/kunq/logs_analyzer/calit2/calit2physics_out.log"
calit2wisc="/home/idpl/results/calit2wisc/placement4.log /home/idpl/results/calit2wisc/timeRead /home/kunq/logs_analyzer/calit2/calit2wisc_out.log"



WORKSPACE="/home/kunq/analyzer0728/"
ANALYZER_PATH="${WORKSPACE}client.py"
SHELL_PATH=${WORKSPACE}
ALL_PARAMS=(buaa2ucsd buaa2cnic buaa2wisc wisc2buaa wisc2ucsd wisc2cnic wisc2calit2 ucsd2buaa ucsd2cnic ucsd2wisc physics2calit2 calit2physics calit2wisc)

for index in ${ALL_PARAMS[@]}
do
eval params=\$${index}
params_arr=($params)
${ANALYZER_PATH} -l ${params_arr[0]} -t ${params_arr[1]} -s ${SHELL_PATH} >> ${params_arr[2]} 2>&1
done
Loading