diff --git a/analyzer/analyzer.py b/analyzer/analyzer.py index 1e5be88..5f78969 100755 --- a/analyzer/analyzer.py +++ b/analyzer/analyzer.py @@ -2,41 +2,48 @@ import re class Analyzer: - #regOfIperf = "'iperf.*'" - def match(self, reg, strToMatch, result): - pattern = re.compile(reg) - match = pattern.search(strToMatch) - if match: - result = match.group()[1:(len(result) - 1)] - #print("bingo! " + result) - return True, result - return False,result + """ use regular expression to judge if a line matches """ + def match(self, reg, strToMatch, result): + pattern = re.compile(reg) + match = pattern.search(strToMatch) + if match: + result = match.group()[1:-1] + return True, result + return False,result - def combi(self, strToCombi, tool): + """ combine bandwidth into a line """ + def combi(self, strToCombi, tool): strArray = strToCombi.split(',')[1:] - if tool == "netcat": - datasize = strArray[len(strArray) - 1] - strArray[len(strArray) - 1] = str(float(datasize) / 1024) - if (not self.deal(strArray)): - return False, '' - #print(strArray) - seq = ' ' - strToCombi = seq.join(strArray) - #print(strToCombi) + + """ transform the unit of datasize in scp from B to KB """ + if tool != "iperf": + datasize = strArray[-1] + strArray[-1] = str(float(datasize) / 1024) + + """ append bandwidth and remove the point whose bandwidth is 0 """ + if (not self.deal(strArray, tool)): + return [False, ''] + + strToCombi = ' '.join(strArray) return [True, strToCombi] - def deal(self, strArray): - bandwidth = float('%0.2f'%((float(strArray[len(strArray) - 1]) * 1024 * 8) / float(strArray[len(strArray) - 2]))) - if (abs(bandwidth) <= 0.000001): - return False - strArray.append(str(bandwidth)) - return True - #print(bandwidth) + """ compute and append bandwidth """ + def deal(self, strArray, tool): + bandwidth = float('%0.2f'%((float(strArray[-1]) * 1024 * 8) / float(strArray[-2]))) + + """ remove the point whose bandwidth is 0 """ + if (abs(bandwidth) <= 0.000001): + return False - def analyze(self, strToMatch, tools): + strArray.append(str(bandwidth)) + strArray.append(tool) + return True + + """ analyze a line """ + def analyze(self, strToMatch, tools, reg_prefix): result = '' for tool in tools: - reg = "'" + tool + ".*'" + reg = "'" + reg_prefix + tool + ".*'" matchResult = self.match(reg, strToMatch, result) if matchResult[0]: resultSet = self.combi(matchResult[1], tool) @@ -46,15 +53,3 @@ def analyze(self, strToMatch, tools): resultSet = [False, result, tool] return resultSet - - - -''' -file = open("D:\\Homework\\GraduationProject\\analyzer\\placement.log") -for line in file: - print(line) -file.close - -analyzer = Analyzer() -analyzer.analyze() -''' diff --git a/analyzer/client.py b/analyzer/client.py index a68da24..b9110ae 100755 --- a/analyzer/client.py +++ b/analyzer/client.py @@ -6,70 +6,90 @@ import sys,getopt class Client: - #uriLog = '/home/idpl/results/idpl.cnic/b2c.log' - #uriTime = '/home/kunq/DataAccessor0.12/timeRead.txt' - #uriLog = '/home/kun/GraduationProject/b2c.log' - #uriTime = '/home/kun/GraduationProject/DataAccessor0.12/timeRead.txt' - uriLog = "" - uriTime = "" - shellPath = "" - sorceFile = [] + def __init__(self): + self.uriLog = "" + self.uriTime = "" + self.shellPath = "" + self.reg_prefix = "undefined" + self.sourceFile = [] + + def usage(self): + print("client.py -l -t -s [-r ]") def getOptions(self): - opts, args = getopt.getopt(sys.argv[1:], "hl:t:s:", ["help", "log=", "timeStamp=", "shellScript="]) + + if len(sys.argv) < 7: + self.usage() + sys.exit() + + opts, args = getopt.getopt(sys.argv[1:], "hl:t:s:r:", ["help", "log=", "timeStamp=", "shellScript=", "regular="]) for op,value in opts: if op in ("-h", "--help"): - print("client.py -l -t -s ") - sys.exit(1) + self.usage() + sys.exit() elif op in ("-l", "--log"): self.uriLog = value elif op in ("-t", "--timeStamp"): self.uriTime = value elif op in ("-s", "--shellScript"): self.shellPath = value + elif op in ("-r", "--regular"): + self.reg_prefix = value + if self.reg_prefix == "undefined": + self.reg_prefix = ".*writerecord:" + elif self.reg_prefix == "NULL": + self.reg_prefix = "" + """ read the log rotated """ def readLog(self, uri): + #TODO with open(uri) as self.sourceFile: return self.sourceFile.readlines(), True def closeFile(self): self.sourceFile.close - def combi(self, result, reg): - return self.shellPath + "post_" + reg + "_time.sh" + " " + result + " USERNAME=username PASSWORD=password HOSTNAME=hostname:port" - + """ choose the corresponding shell to insert into database """ + def combi(self, result, tool): + return self.shellPath + "post_measuredata.sh " + result + " USERNAME=username PASSWORD=password HOSTNAME=hostname:port" + + """ insert data into database """ def excuteShell(self, result): output = os.popen(result) - #print(output.read()) + """ check the data if inserted + if timestmap in the line analyzed now less than timestamp last, indicate the data is inserted + else if timestamps are equal, if tool is equal, indicate the data is inserted + else the data has not inserted""" def check(self, result, timeR, offset): resultArray = result[1].split(' ') - if(float(resultArray[len(resultArray) - offset]) < float(timeR[1])): + if(float(resultArray[-offset]) < float(timeR[1])): return True - elif(abs(float(resultArray[len(resultArray) - offset]) - float(timeR[1])) < 0.000001): + elif(abs(float(resultArray[-offset]) - float(timeR[1])) < 0.000001): return result[2] == timeR[0] return False def splitStr(self, string, char, offset): stringArray = string.split(char) - return stringArray[len(stringArray) - offset] - + return stringArray[-offset] + + """ get the timestamp last read to """ def readTimeRead(self, uri): try: timeReadFile = open(uri) except: - print "file not exists!" + print "timestamp file not exists!" timeRead = timeReadFile.read().strip("\n") timeReadFile.close return timeRead def main(self, analyzer): - offset = 6 + offset = 7 isFinished = False isNewTime = True self.getOptions() - tools = ["iperf", "netcat"] + tools = ["iperf", "scp", "netcat", "fdt"] if not os.path.exists(self.uriTime): print('WARN! Create a timeRead file!') @@ -79,12 +99,13 @@ def main(self, analyzer): timeR = timeRead.split(",") timeRNew = timeRead + """ analyze log """ while(not isFinished): + #TODO read log rotated fileLines, isFinished = self.readLog(self.uriLog) - for i in range (len(fileLines) - 1, 0, -1): - #print(line) - result = analyzer.analyze(fileLines[i], tools) + for line in fileLines[::-1]: + result = analyzer.analyze(line, tools, self.reg_prefix) if result[0]: if self.check(result, timeR, offset): isFinished = True @@ -94,12 +115,11 @@ def main(self, analyzer): timestampNew = self.splitStr(result[1], ' ', offset) timeRNew = result[2] + "," + timestampNew isNewTime = False - print self.combi(result[1], result[2]) - #self.excuteShell(self.combi(result[1], result[2])) + #print self.combi(result[1], result[2]) + self.excuteShell(self.combi(result[1], result[2])) self.closeFile() with open(self.uriTime, 'w') as timeReadFile: - #print(timeRNew) timeReadFile.write(timeRNew) diff --git a/analyzer/post_measuredata.sh b/analyzer/post_measuredata.sh new file mode 100755 index 0000000..23efeb4 --- /dev/null +++ b/analyzer/post_measuredata.sh @@ -0,0 +1,23 @@ +#!/bin/sh + +USERNAME="idpl" +PASSWORD="idpl" +HOSTNAME="localhost:8000" +API_URI="/condor/measurementdata/" + +SENDER=$1 +RECEIVER=$2 +TIME_START=$3 +TIME_END=$4 +CHECKSUM_EQUAL=$5 +DURATION=$6 +DATA_SIZE=$7 +BANDWIDTH=$8 +MEASUREMENT=$9 + +shift 9 +export "$@" + +API_URL=http://$HOSTNAME$API_URI + +curl -u $USERNAME:$PASSWORD -H "Content-Type: application/json" -d "{\"source\": \"$SENDER\", \"destination\": \"$RECEIVER\", \"time_start\": $TIME_START, \"time_end\": $TIME_END, \"md5_equal\": $CHECKSUM_EQUAL, \"duration\": $DURATION, \"data_size\": $DATA_SIZE, \"bandwidth\": $BANDWIDTH, \"measurement\": \"$MEASUREMENT\"}" $API_URL diff --git a/analyzer/readme b/analyzer/readme index 81512af..558916a 100644 --- a/analyzer/readme +++ b/analyzer/readme @@ -1 +1,5 @@ -scripts to synchronize data on the other nodes +scripts to analyze and deal with data + +unit of datasize: +iperf KB +netcat, scp, fdt B diff --git a/analyzer/schedule b/analyzer/schedule new file mode 100644 index 0000000..0cbbefb --- /dev/null +++ b/analyzer/schedule @@ -0,0 +1,25 @@ +International + buaa 11:55:00 23:55:00 + buaa2wisc + buaa2cnic + buaa2ucsd + wisc 00:05:00 12:05:00 + wisc2buaa + wisc2cnic + wisc2ucsd + ucsd 00:05:00 12:05:00 + ucsd2buaa + ucsd2cnic + ucsd2wisc + cnic #TODO + +Domestic + buaa 11:55:00 23:55:00 + buaa2cnic + buaa2yunnan + yunnan 00:05:00 12:05:00 + yunnan2buaa + yunnan2cnic + cnic 00:05:00 00:05:00 + cnic2buaa + cnic2yunnan diff --git a/analyzer/schedule.sh b/analyzer/schedule.sh new file mode 100755 index 0000000..50d8bb4 --- /dev/null +++ b/analyzer/schedule.sh @@ -0,0 +1,35 @@ +#!/bin/sh +#buaa +buaa2ucsd="/home/zwzhang/placement4/buaa2ucsd/placement4.log /home/idpl/results/buaa2ucsd/timeRead /home/kunq/logs_analyzer/buaa/buaa2ucsd_out.log" +buaa2cnic="/home/zwzhang/placement4/buaa2cnic/placement4.log /home/idpl/results/buaa2cnic/timeRead /home/kunq/logs_analyzer/buaa/buaa2cnic_out.log" +buaa2wisc="/home/zwzhang/placement4/buaa2wisc/placement4.log /home/idpl/results/buaa2wisc/timeRead /home/kunq/logs_analyzer/buaa/buaa2wisc_out.log" + +#wisc +wisc2buaa="/home/idpl/results/wisc2buaa/placement4.log /home/idpl/results/wisc2buaa/timeRead /home/kunq/logs_analyzer/wisc/wisc2buaa_out.log" +wisc2ucsd="/home/idpl/results/wisc2ucsd/placement4.log /home/idpl/results/wisc2ucsd/timeRead /home/kunq/logs_analyzer/wisc/wisc2ucsd_out.log" +wisc2cnic="/home/idpl/results/wisc2cnic/placement4.log /home/idpl/results/wisc2cnic/timeRead /home/kunq/logs_analyzer/wisc/wisc2cnic_out.log" +wisc2calit2="/home/idpl/results/wisc2calit2/placement4.log /home/idpl/results/wisc2calit2/timeRead /home/kunq/logs_analyzer/wisc/wisc2calit2_out.log" + +#ucsd +ucsd2buaa="/home/idpl/results/ucsd2buaa/placement4.log /home/idpl/results/ucsd2buaa/timeRead /home/kunq/logs_analyzer/ucsd/ucsd2buaa_out.log" +ucsd2cnic="/home/idpl/results/ucsd2cnic/placement4.log /home/idpl/results/ucsd2cnic/timeRead /home/kunq/logs_analyzer/ucsd/ucsd2cnic_out.log" +ucsd2wisc="/home/idpl/results/ucsd2wisc/placement4.log /home/idpl/results/ucsd2wisc/timeRead /home/kunq/logs_analyzer/ucsd/ucsd2wisc_out.log" +physics2calit2="/home/idpl/results/physics2calit2/placement4.log /home/idpl/results/physics2calit2/timeRead /home/kunq/logs_analyzer/physics/physics2calit2_out.log" + +#calit2 +calit2physics="/home/idpl/results/calit2physics/placement4.log /home/idpl/results/calit2physics/timeRead /home/kunq/logs_analyzer/calit2/calit2physics_out.log" +calit2wisc="/home/idpl/results/calit2wisc/placement4.log /home/idpl/results/calit2wisc/timeRead /home/kunq/logs_analyzer/calit2/calit2wisc_out.log" + + + +WORKSPACE="/home/kunq/analyzer0728/" +ANALYZER_PATH="${WORKSPACE}client.py" +SHELL_PATH=${WORKSPACE} +ALL_PARAMS=(buaa2ucsd buaa2cnic buaa2wisc wisc2buaa wisc2ucsd wisc2cnic wisc2calit2 ucsd2buaa ucsd2cnic ucsd2wisc physics2calit2 calit2physics calit2wisc) + +for index in ${ALL_PARAMS[@]} +do + eval params=\$${index} + params_arr=($params) + ${ANALYZER_PATH} -l ${params_arr[0]} -t ${params_arr[1]} -s ${SHELL_PATH} >> ${params_arr[2]} 2>&1 +done diff --git a/synchronizer/fileReader.py b/synchronizer/fileReader.py deleted file mode 100755 index 3823bae..0000000 --- a/synchronizer/fileReader.py +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/env python -# encoding: UTF-8 - -import re,os - -class FileReader: - - fileUri = "" - - offsetLast = 0 - offsetNew = 0 - offsetNow = 0 - timestampNew = "" - isHasChanged = False - - def read(self, fileUriNow): - with open(fileUriNow) as text: - #print("fileReader, file is " + fileUriNow) - return text.readlines() - - def getALine(self, fileLines): - return fileLines.pop() - - def match(self, reg, strToMatch, timestamp): - match = re.compile(reg).search(strToMatch) - - if match: - timestampNow = match.group()[1:(len(match.group()) - 1)].split(',')[3] - #print (timestampNow) - if not self.isHasChanged: - #print (self.offsetNow) - self.timestampNew = timestampNow - self.offsetNew = self.offsetNow - self.isHasChanged = True - return float(timestampNow) > timestamp - else: - if not self.isHasChanged: - #print("add!!!!") - self.offsetNow += 1 - #print(self.offsetNow) - return True - - def getNewFilePath(self, suffix): - return self.fileUri + '.' + str(suffix), suffix + 1 - - - def chooseLinesInAFile(self, fileLines, reg, timestamp, strAdded, isTimeReached): - while len(fileLines): - line = self.getALine(fileLines) - #print ("line: " + line) - if self.match(reg, line, timestamp): - strAdded.insert(0, line) - else: - isTimeReached = True - break - return strAdded, isTimeReached - - - def chooseLines(self, timestamp, offsetL, path): - #self.fileUri = "/home/kun/GraduationProject/ServerClient0.11/test.txt" - self.fileUri = path - print "file is: " + self.fileUri + " t is: " + str(timestamp) + " o is: " + str(offsetL) - self.offsetLast = offsetL - reg = "'iperf.*'" - isTimeReached = False - strAdded = [] - fileUriNow = self.fileUri - suffix = 0 - if not os.path.exists(fileUriNow): - print("file not exist!") - sys.exit(0) - while (not isTimeReached and os.path.exists(fileUriNow)): - fileLines = self.read(fileUriNow) - strAdded, isTimeReached = self.chooseLinesInAFile(fileLines, reg, timestamp, strAdded, isTimeReached) - fileUriNow, suffix = self.getNewFilePath(suffix) - print ("isTimeReached: " + str(isTimeReached) + " file is: " + fileUriNow + " suffix is: " + str(suffix) + " is: " + str(os.path.exists(fileUriNow))) - #print (''.join(strAdded)) - #print("Now offset is: " + str(self.offsetNew)) - return ''.join(strAdded[self.offsetLast:]), self.timestampNew, str(self.offsetNew) - -if __name__ == '__main__': - timestamp = 0 - fileReader = FileReader() - fileReader.chooseLines(timestamp) - - diff --git a/synchronizer/readme b/synchronizer/readme index 99c7fd7..81512af 100644 --- a/synchronizer/readme +++ b/synchronizer/readme @@ -1 +1 @@ -scripts to analyze and deal with data +scripts to synchronize data on the other nodes diff --git a/synchronizer/schedule b/synchronizer/schedule new file mode 100644 index 0000000..21b7dd3 --- /dev/null +++ b/synchronizer/schedule @@ -0,0 +1,22 @@ +International: + cnic + 11:45:00 + 23:45:00 + komatsu + 11:50:00 + 23:50:00 + murpa + 11:55:00 + 23:55:00 + +Domestic + cnic + 11:35:00 + 23:35:00 + yunnan + 11:55:00 + 23:55:00 + + + + diff --git a/synchronizer/socketClient.py b/synchronizer/socketClient.py deleted file mode 100755 index 9be87a6..0000000 --- a/synchronizer/socketClient.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python -# encoding: UTF-8 - -import socket -import sys,getopt -import xmlReader -import CondorTools - -chirp = CondorTools.CondorChirp() -class Client: - - def __init__(self,config): - self.config = config - print ("client init") - - def get_constant(self, prefix): - """Create a dictionary mapping socket module constants to their names.""" - return dict( (getattr(socket, n), n) - for n in dir(socket) if n.startswith(prefix) - ) - - def get_constants(self, sock): - families = self.get_constant('AF_') - types = self.get_constant('SOCK_') - protocols = self.get_constant('IPPROTO_') - - print >>sys.stderr, 'Family :', families[sock.family] - print >>sys.stderr, 'Type :', types[sock.type] - print >>sys.stderr, 'Protocol:', protocols[sock.proto] - print >>sys.stderr - - def demand(self): - - print ("client demand") - try: - xmlHandler = xmlReader.XmlHandler(self.config) - except: - print "xml read error" - sys.exit() - host, port, path, timestamp, offset = xmlHandler.read() - print "five vals:" - print host, port, path, timestamp, offset - - interval = 5 - maxtries = 12*3 - serverInfo = chirp.getJobAttrWait("SocketServer",None,interval, maxtries) - print "serverInfo is:" + serverInfo - #hostFromCondor,portFromCondor = serverInfo.strip("'").split() - #print hostFromCondor, portFromCondor - # Create a TCP/IP socket - sock = socket.create_connection((host, int(port))) - self.get_constants(sock) - - print host, port, path, timestamp, offset - - try: - # Send data - message = timestamp + ',' + offset - sock.sendall(message) - amount_received = 0 - rec = sock.recv(64) - print("rec is: " + rec) - amount = int(rec) - #print amount - sock.sendall("kunBegin") - - dataComp = "" - while amount_received < int(amount): - print (amount_received, amount) - data = sock.recv(min(4096, int(amount) - amount_received)) - dataComp += data - amount_received += len(data) - - strAdded, timestamp, offset = dataComp.split("KUNSIGN") - - print strAdded - if not amount_received < amount: - with open(path, "a") as output: - output.write(strAdded) - print "time is " + timestamp - if timestamp and offset: - xmlHandler.write(timestamp, offset, self.config) - except: - sock.sendall("kunStop") - print "amount value error" - - finally: - print >>sys.stderr, 'closing socket' - sock.close() - print("dadada") - -if __name__ == '__main__': - if len(sys.argv) < 2: - print ("client val num error!") - client = Client(sys.argv[1]) - client.demand() diff --git a/synchronizer/socketServer.py b/synchronizer/socketServer.py deleted file mode 100755 index ff3f299..0000000 --- a/synchronizer/socketServer.py +++ /dev/null @@ -1,96 +0,0 @@ -#!/usr/bin/env python -# encoding: UTF-8 - -import socket -import sys -import time -from thread import * -import fileReader -import re, getopt -import CondorTools - -HOST = "" -PORT = 5022 -chirp = CondorTools.CondorChirp() - -class Server: - def __init__(self, path, host, port): - self.path = path - HOST = host - PORT = port - - def commuWithClient(self, conn): - #conn.send("welcome!\n") - timestamp = "" - timestampNew = "" - offset = "" - offsetNew = "" - while True: - data = conn.recv(64) - print ("data is: " + data + " type is: " + str(type(data))) - if self.match(r"\d+(\.\d+)?,\d+", data): - print("begin to read file!!") - timestamp, offset = data.split(',') - print "cat is: " + timestamp + " o is: " + offset + " p is: " + self.path - strAdded, timestampNew, offsetNew = fileReader.FileReader().chooseLines(float(timestamp), int(offset), self.path) - dataToSend = strAdded + "KUNSIGN" + timestampNew + "KUNSIGN" + offsetNew - print(len(strAdded)) - conn.sendall(str(len(dataToSend))) - elif self.match("kunBegin", data): - conn.sendall(dataToSend) - elif self.match("kunStop", data): - #print("value error") - raise - else: - print("Done!") - break - #conn.send("bye\n") - #conn.close() - - def match(self, reg, strToMatch): - return re.compile(reg).match(strToMatch) - - - - def changeFlag(self): - time.sleep(2) - chirp.setJobAttr("SocketServer","'%s %d'" % (HOST, PORT)) - print("ChangeFlag()") - - def serve(self): - s = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - print ("Socket created") - try: - s.bind((HOST, PORT)) - except socket.error, msg: - print ("Bind failed. Error Code : " + str(msg[0]) + " Message " + msg[1]) - sys.exit() - - print ("Socket bind complete") - - s.listen(1) - print ("socket now listening") - - flagOfCondor = "" - start_new_thread(self.changeFlag, ()) - conn, addr = s.accept() - print ("Connected with " + addr[0] + ":" + str(addr[1])) - try: - self.commuWithClient(conn) - except : - print ("value error!") - raise - finally: - s.close() - print ("socket close now !") - chirp.setJobAttr("SocketServer",None) - print("exit") - sys.exit() - -if __name__ == '__main__': - chirp.setJobAttr("SocketServer",None) - if len(sys.argv) < 4: - print ("server val num error") - server = Server(sys.argv[1], sys.argv[2], sys.argv[3]) - server.serve() - diff --git a/synchronizer/sslMain.py b/synchronizer/sslMain.py new file mode 100755 index 0000000..ff0040d --- /dev/null +++ b/synchronizer/sslMain.py @@ -0,0 +1,61 @@ +#! /usr/bin/env python +import os,time +import sys,getopt +import TimedExec +from IDPLException import * +import sslMover + +## ***************************** +## main routine +## ***************************** + +print("\n%s\n" % time.strftime('%Y-%m-%d %H:%M:%S', time.localtime(time.time()))) +timeout = 120 +sslexe = "./sslMover.py" +log_path = "" +port = "" +syn_log = "" +reg_exp = "" + +def usage(): + print("sslMain.py -l -p -s [-r ]") + +if len(sys.argv) < 7: + usage() + sys.exit() + +try: + opts, args = getopt.getopt(sys.argv[1:], "hl:p:s:r:", ["help", "log_path=", "port=", "syn_log=", "reg_exp="]) +except getopt.GetoptError: + usage() + sys.exit() + +for opt, arg in opts: + if opt in ("-h", "--help" ): + usage() + sys.exit() + elif opt in ("-l", "--log_path"): + log_path = arg + elif opt in ("-p", "--port"): + port = arg + elif opt in ("-s", "--syn_log"): + syn_log = arg + elif opt in ("-r", "--reg_exp"): + reg_exp = arg + +#sslMover.main(sys.argv[1:]) +if reg_exp == "": + reg_exp = "'.*writerecord:iperf.*'" + +resultcode,output,err=TimedExec.runTimedCmd(timeout,[sslexe, "-l", log_path, "-p", port, "-s", syn_log, "-r", reg_exp]) +sys.stdout.write("output: %s" % " ".join(output)) +sys.stderr.write("err: %s" % " ".join(err)) +if resultcode < 0: + side = int(os.environ['_CONDOR_PROCNO']) + sys.stderr.write("Timeout! Result code %d" % resultcode) + if side == 0: + raise TimeOutException("client") + else: + raise TimeOutException("server") + +# vim: ts=4:sw=4 diff --git a/synchronizer/sslMover.py b/synchronizer/sslMover.py new file mode 100755 index 0000000..25c620e --- /dev/null +++ b/synchronizer/sslMover.py @@ -0,0 +1,387 @@ +#!/usr/bin/env python +# encoding: UTF-8 + +import socket, ssl +import sys, os, traceback +import time, hashlib +from thread import * +import re, getopt +import CondorTools +from xml.etree import ElementTree as ET + +chirp = CondorTools.CondorChirp() + +class FileReader: + + def __init__(self): + self.fileUri = "" + self.offsetLast = 0 + self.offsetNew = 0 + self.offsetNow = 0 + self.timestampNew = "" + self.hasChanged = False + + def read(self, fileUriNow): + with open(fileUriNow) as text: + return text.readlines() + + def match(self, reg, strToMatch, timestamp): + """ identify if a line matches the rule to add. + if passes the regular expression test, + update timestampNew and offsetNew when first time passes + compare the timestamp, choose the line timestampNow > timestamp + else + add the line""" + + match = re.compile(reg).search(strToMatch) + if match: + timestampNow = match.group()[1:(len(match.group()) - 1)].split(',')[3] + if not self.hasChanged: + self.timestampNew = timestampNow + self.offsetNew = self.offsetNow + self.hasChanged = True + return float(timestampNow) > timestamp + else: + if not self.hasChanged: + self.offsetNow += 1 + return True + + def getNewFilePath(self, suffix): + """ get the log rotated """ + return "%s.%d" % (self.fileUri, suffix), suffix + 1 + + def chooseLinesInAFile(self, fileLines, reg, timestamp, strAdded, isTimeReached): + """ choose lines match the rules in a file """ + for line in fileLines[::-1]: + if self.match(reg, line, timestamp): + strAdded.insert(0, line) + else: + isTimeReached = True + break + return strAdded, isTimeReached + + """ get all lines in a transfer """ + def chooseLines(self, timestamp, offsetL, path, reg): + iam = "server" + ulog(iam, "extract data") + self.fileUri = path + self.offsetLast = offsetL + isTimeReached = False + strAdded = [] + fileUriNow = self.fileUri + suffix = 0 + + if not os.path.exists(fileUriNow): + ulog(iam, "file not exist!") + print("file not exist!") + sys.exit(0) + + """ scan all logs including rotated if exist """ + while (not isTimeReached and os.path.exists(fileUriNow)): + fileLines = self.read(fileUriNow) + strAdded, isTimeReached = self.chooseLinesInAFile(fileLines, reg, timestamp, strAdded, isTimeReached) + fileUriNow, suffix = self.getNewFilePath(suffix) + + return ''.join(strAdded[self.offsetLast:]), self.timestampNew, str(self.offsetNew) + +class TransmissionException(Exception): + def __init__(self, msg): + Exception.__init__(self, msg) + self.msg = msg + +class Server: + def __init__(self, path, port, reg_exp): + self.path = path + self.host = socket.getfqdn() + self.port = port + self.reg_exp = reg_exp + self.iam = "server" + + def commuWithClient(self, conn): + timestamp = "" + timestampNew = "" + offset = "" + offsetNew = "" + + """ get data to transfer via timestamp and offset from client and send data to client """ + while True: + data = conn.recv(64) + + if self.match(r"\d+(\.\d+)?,\d+", data): + timestamp, offset = data.split(',') + strAdded, timestampNew, offsetNew = FileReader().chooseLines(float(timestamp), int(offset), self.path, self.reg_exp) + dataToSend = "%s" % strAdded + lenOfData = len(dataToSend) + + """ nothing to update """ + if lenOfData == 0: + conn.sendall("NONE") + ulog(self.iam, "nothing to update") + break + + conn.sendall(str(len(dataToSend))) + md5OfData = hashlib.md5() + md5OfData.update(dataToSend) + chirp.setJobAttr("MD5OfData", "'%s'" % md5OfData.hexdigest()) + + elif self.match("BEGIN", data): + conn.sendall(dataToSend) + + elif self.match("STOP", data): + raise TransmissionException("value error") + + elif self.match("END", data): + break + + def match(self, reg, strToMatch): + return re.compile(reg).match(strToMatch) + + def changeFlag(self): + time.sleep(2) + chirp.setJobAttr("SSLServer","'%s %d'" % (self.host, int(self.port))) + + def serve(self): + """ create an x509 cert and an rsa private key """ + ulog(self.iam, "create an ssl certificate and a private key") + path = "./" + certpath = "%scert.pem" % path + keypath = "%skey.pem" % path + os.popen("openssl req -newkey rsa:1024 -x509 -days 365 -nodes -out %s -keyout %s -batch" % (certpath, keypath)) + + """ transfer SSL certificate to client via chirp""" + ulog(self.iam, "send certificate to client") + certStr = "" + with open(certpath) as cert: + for line in cert.readlines(): + certStr = "%s%s" % (certStr, line.strip("\n")) + chirp.setJobAttr("SSLCert", "'%s'" % certStr) + + """ create a socket""" + ulog(self.iam, "create a sockect connection") + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.bind(("", int(self.port))) + except socket.error, msg: + ulog(self.iam, "Bind failed. Error Code: %s Message %s" % (str(msg[0]), msg[1])) + print("Bind failed. Error Code: %s Message %s" % (str(msg[0]), msg[1])) + sys.exit() + + """ wait to connect from client """ + sock.listen(1) + ulog(self.iam, "set SSLServer chirp") + start_new_thread(self.changeFlag, ()) + conn, addr = sock.accept() + + """ wrap socket via ssl """ + ulog(self.iam, "transaction with client") + conn_ssl = ssl.wrap_socket(conn, server_side = True, certfile = certpath, keyfile = keypath) + try: + self.commuWithClient(conn_ssl) + except TransmissionException, trans: + ulog(self.iam, "%s" % trans.msg) + print("%s" % trans.msg) + traceback.print_exc() + + + finally: + sock.close() + ulog(self.iam, "socket close") + chirp.setJobAttr("SSLServer", None) + chirp.setJobAttr("SSLCert", None) + chirp.setJobAttr("MD5OfData", None) + if os.path.exists(certpath): + os.remove(certpath) + if os.path.exists(keypath): + os.remove(keypath) + sys.exit() + + +class Client: + + def __init__(self, syn_log, reg_exp): + self.syn_log = syn_log + self.reg_exp = reg_exp + self.iam = "client" + + def get_constant(self, prefix): + """Create a dictionary mapping socket module constants to their names.""" + return dict( + (getattr(socket, n), n) for n in dir(socket) if n.startswith(prefix) + ) + + def get_constants(self, sock): + families = self.get_constant('AF_') + types = self.get_constant('SOCK_') + protocols = self.get_constant('IPPROTO_') + + ulog(self.iam, 'Family : %s' % families[sock.family]) + ulog(self.iam, 'Type : %s' % types[sock.type]) + ulog(self.iam, 'Protocol: %s' % protocols[sock.proto]) + + def writeSSLCert(self, path, sslCert): + certBegin = sslCert[ : 27] + certEnd = sslCert[-25 : ] + #TODO if 27 > len -25 ? + certContent = sslCert[27 : -25] + certContentList = [] + + for i in range(0, len(certContent), 64): + line = certContent[i : i + 64] + certContentList.append(line + "\n") + certContent = "".join(certContentList) + certDealt = "%s\n%s%s" % (certBegin, certContent, certEnd) + + with open(path, "w") as certfile: + certfile.write(certDealt) + + def getTimestampOffset(self): + reg = self.reg_exp + pattern = re.compile(reg) + timestamp = "0" + offset = 0 + if not os.path.exists(self.syn_log): + f = open(self.syn_log, "w") + f.close + return timestamp, offset + + with open(self.syn_log) as s_log: + fileLines = s_log.readlines() + for line in fileLines[::-1]: + match = pattern.search(line) + if match: + timestamp = match.group()[1:-1].split(",")[3] + break + else: + offset += 1 + return timestamp, offset + + def request(self): + + timestamp, offset = self.getTimestampOffset() + + """ Get host and port from chirp """ + ulog(self.iam, "get SSLServer chirp") + interval = 5 + maxtries = 12*3 + serverInfo = chirp.getJobAttrWait("SSLServer",None,interval, maxtries) + host,port = serverInfo.strip("'").split() + + """ Write the ssl certificate """ + ulog(self.iam, "get ssl certificate from server") + certpath = "./cert.pem" + sslCert = chirp.getJobAttrWait("SSLCert", None, interval, maxtries).strip("'") + self.writeSSLCert(certpath, sslCert) + + """ Create a TCP/IP socket with SSL """ + ulog(self.iam, "create a connection with ssl") + sock = socket.create_connection((host, int(port))) + self.get_constants(sock) + sockSSL = ssl.wrap_socket(sock, ca_certs = certpath, cert_reqs = ssl.CERT_REQUIRED) + + + """ Send data """ + try: + + """ get amount of data to receive """ + ulog(self.iam, "begin to get data") + message = "%s,%s" % (timestamp, offset) + sockSSL.sendall(message) + rec = sockSSL.recv(64) + + """ nothing to update """ + if rec == "NONE": + sockSSL.close() + sock.close() + sys.exit() + + amount = int(rec) + amount_received = 0 + + """ receive data """ + sockSSL.sendall("BEGIN") + strAdded = "" + while amount_received < int(amount): + data = sockSSL.recv(min(4096, int(amount) - amount_received)) + strAdded += data + amount_received += len(data) + + """ get md5 from server and generate md5 data received """ + md5FromServer = chirp.getJobAttrWait("MD5OfData", None, interval, maxtries).strip("'") + md5LocalGen = hashlib.md5() + md5LocalGen.update(strAdded) + md5Local = md5LocalGen.hexdigest() + + sockSSL.sendall("END") + + """ write data to log """ + if not amount_received < amount and md5FromServer == md5Local: + with open(self.syn_log, "a") as output: + ulog(self.iam, "update log") + output.write(strAdded) + except Exception, e: + sockSSL.sendall("STOP") + traceback.print_exc() + + finally: + ulog(self.iam, 'closing socket') + sockSSL.close() + sock.close() + if os.path.exists(certpath): + os.remove(certpath) + +def ulog(who, message): + logMessage = "%s : %s" % (who, message) + chirp.ulog(logMessage) + +def usage(): + print("sslMain.py -l -p -s [-r ]") + +def main(argv): + print "argvs are: " + " ".join(argv) + if len(argv) < 6: + usage() + sys.exit() + + try: + opts, args = getopt.getopt(argv, "hl:p:s:r:", ["help", "log_path=", "port=", "syn_log=", "reg_exp="]) + except getopt.GetoptError: + usage() + sys.exit() + + reg_exp = "" + for opt, arg in opts: + if opt in ("-h", "--help" ): + usage() + sys.exit() + elif opt in ("-l", "--log_path"): + log_path = arg + elif opt in ("-p", "--port"): + port = arg + elif opt in ("-s", "--syn_log"): + syn_log = arg + elif opt in ("-r", "--reg_exp"): + reg_exp = arg + + if reg_exp == "": + reg_exp = "'.*writerecord:iperf.*'" + + if int(os.environ['_CONDOR_PROCNO']) == 0: + chirp.ulog("client start") + client = Client(syn_log, reg_exp) + client.request() + + else: + chirp.ulog("server start") + chirp.setJobAttr("SSLServer",None) + chirp.setJobAttr("SSLCert", None) + chirp.setJobAttr("MD5OfData", None) + server = Server(log_path, port, reg_exp) + server.serve() + chirp.setJobAttr("SSLServer", None) + chirp.setJobAttr("SSLCert", None) + chirp.setJobAttr("MD5OfData", None) + +if __name__ == '__main__': + main(sys.argv[1:]) + +# vim: ts=4:sw=4 diff --git a/synchronizer/server_client-submit b/synchronizer/ssl_move-submit similarity index 61% rename from synchronizer/server_client-submit rename to synchronizer/ssl_move-submit index d41ab45..ad2ab5f 100644 --- a/synchronizer/server_client-submit +++ b/synchronizer/ssl_move-submit @@ -5,21 +5,14 @@ ############ universe = parallel -executable = transmission.py +executable = sslMain.py +reg_exp='.*writerecord:iperf.*' - -SRC_HOST=mickey.buaa.edu.cn -#SRC_HOST=JSI-iDPL01 -#SRC_PATH=/home/phil/htcondor-8.2.2-4.x86_64.disk1.iso -DST_HOST=komatsu.chtc.wisc.edu -#DST_HOST=JSI-iDPL02 -#DST_PATH=htcondor-8.2.2-4.x86_64.disk1.iso -path=/home/phil/placement/placement2.log -#path=/tmp/testbykun/placement2.log -host= -port=5022 -#config=/tmp/testbykun/client_config.xml -config=/home/idpl/remoteLogs/komatsu/client_config.xml +SRC_HOST=JSI-iDPL01 +DST_HOST=JSI-iDPL02 +#log_path=/tmp/testbykun/test.txt +port=8888 +#syn_log=/tmp/testbykun/test1.txt ### Crondor Settings # A promise that jobs will not run more often than this (in seconds) @@ -28,13 +21,14 @@ config=/home/idpl/remoteLogs/komatsu/client_config.xml # A run is allowed to take this long (in seconds) to set up; otherwise # that run is skipped -cron_window=60 +#cron_window=60 # Try to run jobs on this schedule -cron_minute=0-59/30 +#cron_minute=40,45 +#cron_hour=11,23 # # Keep running the job -on_exit_remove=false +#on_exit_remove=false # Arguments are: # 1. Sending host @@ -42,15 +36,15 @@ on_exit_remove=false # 3. Receiving host # 4. Location to write file (on the receiving host) #arguments = $(SRC_HOST) $(SRC_PATH) $(DST_HOST) $(DST_PATH) $(LEASE) -arguments= $(path) $(host) $(port) $(config) +arguments= -l $(log_path) -p $(port) -s $(syn_log) -r $(reg_exp) ## Enable Chirp +WantIOProxy = true input = /dev/null -output = ./out/transmission.out.$(Node) -error = ./err/transmission.err.$(Node) -log = ./log/transmission.log +output = ./out/sslMain.out.$(Node) +error = ./err/sslMain.err.$(Node) +log = ./log/sslMain.log getenv = true #+SrcPath = "$(SRC_PATH)" @@ -59,7 +53,7 @@ getenv = true +ParallelShutdownPolicy = "WAIT_FOR_ALL" -transfer_input_files = TimedExec.py,IDPLException.py,CondorTools.py,socketServer.py,socketClient.py,fileReader.py,xmlReader.py +transfer_input_files = TimedExec.py,IDPLException.py,CondorTools.py,sslMover.py should_transfer_files = YES when_to_transfer_output = ON_EXIT diff --git a/synchronizer/submit.dag b/synchronizer/submit.dag new file mode 100644 index 0000000..26fc6e0 --- /dev/null +++ b/synchronizer/submit.dag @@ -0,0 +1,5 @@ +JOB A ssl_move-submit +JOB B ssl_move-submit +VARS A log_path="/tmp/testbykun/test.txt" syn_log="/tmp/testbykun/test1.txt" +VARS B log_path="/tmp/testbykun/k2b.txt" syn_log="/tmp/testbykun/k2b1.txt" +PARENT A CHILD B diff --git a/synchronizer/submit.dag.cron b/synchronizer/submit.dag.cron new file mode 100644 index 0000000..1288f50 --- /dev/null +++ b/synchronizer/submit.dag.cron @@ -0,0 +1,6 @@ +# run the dag job periodly +cron_minute = 55 +cron_hour = 11,23 +cron_window = 120 +on_exit_remove = false +universe = local diff --git a/synchronizer/transmission.py b/synchronizer/transmission.py deleted file mode 100755 index 09963a6..0000000 --- a/synchronizer/transmission.py +++ /dev/null @@ -1,55 +0,0 @@ -#! /usr/bin/env python -import os,time -import sys -import socketClient -import socketServer -import socket -import TimedExec -from IDPLException import * - - -def myprint(string): - print("*********************************************************\n" + string) - -## ***************************** -## main routine -## ***************************** - -print('************************' + time.strftime('%Y-%m-%d %H:%M:%S',time.localtime(time.time())) + '************************\n') -serverTimeout = 120 -clientTimeout = 120 -server = "./socketServer.py" -client = "./socketClient.py" - -if len(sys.argv) < 3: - print ("val number error") -for string in sys.argv: - print string - -path = sys.argv[1] -host = "" -port = sys.argv[2] -config = sys.argv[3] - -print path, host, port, config -if int(os.environ['_CONDOR_PROCNO']) == 0: - print ("client start") - client = socketClient.Client(config) - client.demand() - #print socket.gethostname(), socket.gethostbyname(socket.gethostname()) - #resultcode,output,err=TimedExec.runTimedCmd(clientTimeout,[client, config]) - #print(output) - #if resultcode < 0: - #print("Timeout! Result code: %d" % resultcode) - #print(err) - #raise TimeOutException("client") -else: - print("server start") - server = socketServer.Server(path, host, port) - server.serve() - #resultcode,output,err=TimedExec.runTimedCmd(serverTimeout, [server, path, host, port]) - #print(output) - #if resultcode < 0: - #print("Result code: %d" % resultcode) - #print(err) - #raise TimeOutException("server") diff --git a/synchronizer/xmlReader.py b/synchronizer/xmlReader.py deleted file mode 100755 index f493e4d..0000000 --- a/synchronizer/xmlReader.py +++ /dev/null @@ -1,63 +0,0 @@ -#!/usr/bin/env python -# encoding: UTF-8 - -from xml.etree import ElementTree as ET -import os - -class XmlHandler: - def __init__(self, xmlfile): - print ("xmlHandler init: " + xmlfile) - self.xmlTree = self.readXml(xmlfile) - - def readXml(self, in_path): - if not os.path.exists(in_path): - print ("there is no such file: " + in_path) - sys.exit() - try: - tree = ET.parse(in_path) - except: - print ("tree parse error") - print ("return tree successfully") - return tree - - def getNodes(self, tree): - root = tree.getroot() - print ("return root successfully") - return root.getchildren() - - def findNode(self, nodes, tag): - for node in nodes: - if node.tag == tag: - return node - - def getTexts(self, nodes, tags): - texts = [] - for tag in tags: - texts.append(self.findNode(nodes, tag).text) - return texts - - def read(self): - nodes = self.getNodes(self.xmlTree) - host, port, path, timestamp, offset= self.getTexts(nodes, ["host", "port", "path", "timestamp", "offset"]) - return host, port, path, timestamp, offset - - def writeXml(self, node, text): - node.text = text - #print node.tag, node.text - - def setTexts(self, texts, tags): - nodes = self.getNodes(self.xmlTree) - for text, tag in zip(texts, tags): - self.writeXml(self.findNode(nodes, tag), text) - - def write(self, newTimestamp, newOffset, xmlfile): - #int "time is " + newTimestamp - self.setTexts([newTimestamp, newOffset], ["timestamp", "offset"]) - self.xmlTree.write(xmlfile, encoding="utf-8") - - -if __name__ == '__main__': - xmlHandler = XmlHandler("client_config.xml") - print xmlHandler.read() - #xmlHandler.write("newTimestamp", "newOffset", "client_config.xml") - print xmlHandler.read()