diff --git a/core/src/main/bin/dxprof_py3.py b/core/src/main/bin/dxprof_py3.py new file mode 100644 index 00000000..f325b6b6 --- /dev/null +++ b/core/src/main/bin/dxprof_py3.py @@ -0,0 +1,192 @@ +#! /usr/bin/env python +# vim: set expandtab tabstop=4 shiftwidth=4 foldmethod=marker nu: + +import re +import sys +import time + +REG_SQL_WAKE = re.compile(r'Begin\s+to\s+read\s+record\s+by\s+Sql', re.IGNORECASE) +REG_SQL_DONE = re.compile(r'Finished\s+read\s+record\s+by\s+Sql', re.IGNORECASE) +REG_SQL_PATH = re.compile(r'from\s+(\w+)(\s+where|\s*$)', re.IGNORECASE) +REG_SQL_JDBC = re.compile(r'jdbcUrl:\s*\[(.+?)\]', re.IGNORECASE) +REG_SQL_UUID = re.compile(r'(\d+\-)+reader') +REG_COMMIT_UUID = re.compile(r'(\d+\-)+writer') +REG_COMMIT_WAKE = re.compile(r'begin\s+to\s+commit\s+blocks', re.IGNORECASE) +REG_COMMIT_DONE = re.compile(r'commit\s+blocks\s+ok', re.IGNORECASE) + +# {{{ function parse_timestamp() # +def parse_timestamp(line): + try: + ts = int(time.mktime(time.strptime(line[0:19], '%Y-%m-%d %H:%M:%S'))) + except: + ts = 0 + + return ts + +# }}} # + +# {{{ function parse_query_host() # +def parse_query_host(line): + ori = REG_SQL_JDBC.search(line) + if (not ori): + return '' + + ori = ori.group(1).split('?')[0] + off = ori.find('@') + if (off > -1): + ori = ori[off+1:len(ori)] + else: + off = ori.find('//') + if (off > -1): + ori = ori[off+2:len(ori)] + + return ori.lower() +# }}} # + +# {{{ function parse_query_table() # +def parse_query_table(line): + ori = REG_SQL_PATH.search(line) + return (ori and ori.group(1).lower()) or '' +# }}} # + +# {{{ function parse_reader_task() # +def parse_task(fname): + global LAST_SQL_UUID + global LAST_COMMIT_UUID + global DATAX_JOBDICT + global DATAX_JOBDICT_COMMIT + global UNIXTIME + LAST_SQL_UUID = '' + DATAX_JOBDICT = {} + LAST_COMMIT_UUID = '' + DATAX_JOBDICT_COMMIT = {} + + UNIXTIME = int(time.time()) + with open(fname, 'r') as f: + for line in f.readlines(): + line = line.strip() + + if (LAST_SQL_UUID and (LAST_SQL_UUID in DATAX_JOBDICT)): + DATAX_JOBDICT[LAST_SQL_UUID]['host'] = parse_query_host(line) + LAST_SQL_UUID = '' + + if line.find('CommonRdbmsReader$Task') > 0: + parse_read_task(line) + elif line.find('commit blocks') > 0: + parse_write_task(line) + else: + continue +# }}} # + +# {{{ function parse_read_task() # +def parse_read_task(line): + ser = REG_SQL_UUID.search(line) + if not ser: + return + + LAST_SQL_UUID = ser.group() + if REG_SQL_WAKE.search(line): + DATAX_JOBDICT[LAST_SQL_UUID] = { + 'stat' : 'R', + 'wake' : parse_timestamp(line), + 'done' : UNIXTIME, + 'host' : parse_query_host(line), + 'path' : parse_query_table(line) + } + elif ((LAST_SQL_UUID in DATAX_JOBDICT) and REG_SQL_DONE.search(line)): + DATAX_JOBDICT[LAST_SQL_UUID]['stat'] = 'D' + DATAX_JOBDICT[LAST_SQL_UUID]['done'] = parse_timestamp(line) +# }}} # + +# {{{ function parse_write_task() # +def parse_write_task(line): + ser = REG_COMMIT_UUID.search(line) + if not ser: + return + + LAST_COMMIT_UUID = ser.group() + if REG_COMMIT_WAKE.search(line): + DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID] = { + 'stat' : 'R', + 'wake' : parse_timestamp(line), + 'done' : UNIXTIME, + } + elif ((LAST_COMMIT_UUID in DATAX_JOBDICT_COMMIT) and REG_COMMIT_DONE.search(line)): + DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['stat'] = 'D' + DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['done'] = parse_timestamp(line) +# }}} # + +# {{{ function result_analyse() # +def result_analyse(): + def compare(a, b): + return b['cost'] - a['cost'] + + tasklist = [] + hostsmap = {} + statvars = {'sum' : 0, 'cnt' : 0, 'svr' : 0, 'max' : 0, 'min' : int(time.time())} + tasklist_commit = [] + statvars_commit = {'sum' : 0, 'cnt' : 0} + + for idx in DATAX_JOBDICT: + item = DATAX_JOBDICT[idx] + item['uuid'] = idx; + item['cost'] = item['done'] - item['wake'] + tasklist.append(item); + + if (not (item['host'] in hostsmap)): + hostsmap[item['host']] = 1 + statvars['svr'] += 1 + + if (item['cost'] > -1 and item['cost'] < 864000): + statvars['sum'] += item['cost'] + statvars['cnt'] += 1 + statvars['max'] = max(statvars['max'], item['done']) + statvars['min'] = min(statvars['min'], item['wake']) + + for idx in DATAX_JOBDICT_COMMIT: + itemc = DATAX_JOBDICT_COMMIT[idx] + itemc['uuid'] = idx + itemc['cost'] = itemc['done'] - itemc['wake'] + tasklist_commit.append(itemc) + + if (itemc['cost'] > -1 and itemc['cost'] < 864000): + statvars_commit['sum'] += itemc['cost'] + statvars_commit['cnt'] += 1 + + ttl = (statvars['max'] - statvars['min']) or 1 + idx = float(statvars['cnt']) / (statvars['sum'] or ttl) + + tasklist.sort(compare) + for item in tasklist: + print('%s\t%s.%s\t%s\t%s\t% 4d\t% 2.1f%%\t% .2f' %(item['stat'], item['host'], item['path'], + time.strftime('%H:%M:%S', time.localtime(item['wake'])), + (('D' == item['stat']) and time.strftime('%H:%M:%S', time.localtime(item['done']))) or '--', + item['cost'], 100 * item['cost'] / ttl, idx * item['cost'])) + + if (not len(tasklist) or not statvars['cnt']): + return + + print('\n--- DataX Profiling Statistics ---') + print('%d task(s) on %d server(s), Total elapsed %d second(s), %.2f second(s) per task in average' %(statvars['cnt'], + statvars['svr'], statvars['sum'], float(statvars['sum']) / statvars['cnt'])) + print('Actually cost %d second(s) (%s - %s), task concurrency: %.2f, tilt index: %.2f' %(ttl, + time.strftime('%H:%M:%S', time.localtime(statvars['min'])), + time.strftime('%H:%M:%S', time.localtime(statvars['max'])), + float(statvars['sum']) / ttl, idx * tasklist[0]['cost'])) + + idx_commit = float(statvars_commit['cnt']) / (statvars_commit['sum'] or ttl) + tasklist_commit.sort(compare) + print('%d task(s) done odps comit, Total elapsed %d second(s), %.2f second(s) per task in average, tilt index: %.2f' % ( + statvars_commit['cnt'], + statvars_commit['sum'], float(statvars_commit['sum']) / statvars_commit['cnt'], + idx_commit * tasklist_commit[0]['cost'])) + +# }}} # + +if (len(sys.argv) < 2): + print("Usage: %s filename" %(sys.argv[0])) + quit(1) +else: + parse_task(sys.argv[1]) + result_analyse() +