mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 04:11:34 +08:00
dxprof_py3.py
This commit is contained in:
parent
fce550e2ab
commit
c96ea3542f
192
core/src/main/bin/dxprof_py3.py
Normal file
192
core/src/main/bin/dxprof_py3.py
Normal file
@ -0,0 +1,192 @@
|
||||
#! /usr/bin/env python
|
||||
# vim: set expandtab tabstop=4 shiftwidth=4 foldmethod=marker nu:
|
||||
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
REG_SQL_WAKE = re.compile(r'Begin\s+to\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
|
||||
REG_SQL_DONE = re.compile(r'Finished\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
|
||||
REG_SQL_PATH = re.compile(r'from\s+(\w+)(\s+where|\s*$)', re.IGNORECASE)
|
||||
REG_SQL_JDBC = re.compile(r'jdbcUrl:\s*\[(.+?)\]', re.IGNORECASE)
|
||||
REG_SQL_UUID = re.compile(r'(\d+\-)+reader')
|
||||
REG_COMMIT_UUID = re.compile(r'(\d+\-)+writer')
|
||||
REG_COMMIT_WAKE = re.compile(r'begin\s+to\s+commit\s+blocks', re.IGNORECASE)
|
||||
REG_COMMIT_DONE = re.compile(r'commit\s+blocks\s+ok', re.IGNORECASE)
|
||||
|
||||
# {{{ function parse_timestamp() #
|
||||
def parse_timestamp(line):
|
||||
try:
|
||||
ts = int(time.mktime(time.strptime(line[0:19], '%Y-%m-%d %H:%M:%S')))
|
||||
except:
|
||||
ts = 0
|
||||
|
||||
return ts
|
||||
|
||||
# }}} #
|
||||
|
||||
# {{{ function parse_query_host() #
|
||||
def parse_query_host(line):
|
||||
ori = REG_SQL_JDBC.search(line)
|
||||
if (not ori):
|
||||
return ''
|
||||
|
||||
ori = ori.group(1).split('?')[0]
|
||||
off = ori.find('@')
|
||||
if (off > -1):
|
||||
ori = ori[off+1:len(ori)]
|
||||
else:
|
||||
off = ori.find('//')
|
||||
if (off > -1):
|
||||
ori = ori[off+2:len(ori)]
|
||||
|
||||
return ori.lower()
|
||||
# }}} #
|
||||
|
||||
# {{{ function parse_query_table() #
|
||||
def parse_query_table(line):
|
||||
ori = REG_SQL_PATH.search(line)
|
||||
return (ori and ori.group(1).lower()) or ''
|
||||
# }}} #
|
||||
|
||||
# {{{ function parse_reader_task() #
|
||||
def parse_task(fname):
|
||||
global LAST_SQL_UUID
|
||||
global LAST_COMMIT_UUID
|
||||
global DATAX_JOBDICT
|
||||
global DATAX_JOBDICT_COMMIT
|
||||
global UNIXTIME
|
||||
LAST_SQL_UUID = ''
|
||||
DATAX_JOBDICT = {}
|
||||
LAST_COMMIT_UUID = ''
|
||||
DATAX_JOBDICT_COMMIT = {}
|
||||
|
||||
UNIXTIME = int(time.time())
|
||||
with open(fname, 'r') as f:
|
||||
for line in f.readlines():
|
||||
line = line.strip()
|
||||
|
||||
if (LAST_SQL_UUID and (LAST_SQL_UUID in DATAX_JOBDICT)):
|
||||
DATAX_JOBDICT[LAST_SQL_UUID]['host'] = parse_query_host(line)
|
||||
LAST_SQL_UUID = ''
|
||||
|
||||
if line.find('CommonRdbmsReader$Task') > 0:
|
||||
parse_read_task(line)
|
||||
elif line.find('commit blocks') > 0:
|
||||
parse_write_task(line)
|
||||
else:
|
||||
continue
|
||||
# }}} #
|
||||
|
||||
# {{{ function parse_read_task() #
|
||||
def parse_read_task(line):
|
||||
ser = REG_SQL_UUID.search(line)
|
||||
if not ser:
|
||||
return
|
||||
|
||||
LAST_SQL_UUID = ser.group()
|
||||
if REG_SQL_WAKE.search(line):
|
||||
DATAX_JOBDICT[LAST_SQL_UUID] = {
|
||||
'stat' : 'R',
|
||||
'wake' : parse_timestamp(line),
|
||||
'done' : UNIXTIME,
|
||||
'host' : parse_query_host(line),
|
||||
'path' : parse_query_table(line)
|
||||
}
|
||||
elif ((LAST_SQL_UUID in DATAX_JOBDICT) and REG_SQL_DONE.search(line)):
|
||||
DATAX_JOBDICT[LAST_SQL_UUID]['stat'] = 'D'
|
||||
DATAX_JOBDICT[LAST_SQL_UUID]['done'] = parse_timestamp(line)
|
||||
# }}} #
|
||||
|
||||
# {{{ function parse_write_task() #
|
||||
def parse_write_task(line):
|
||||
ser = REG_COMMIT_UUID.search(line)
|
||||
if not ser:
|
||||
return
|
||||
|
||||
LAST_COMMIT_UUID = ser.group()
|
||||
if REG_COMMIT_WAKE.search(line):
|
||||
DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID] = {
|
||||
'stat' : 'R',
|
||||
'wake' : parse_timestamp(line),
|
||||
'done' : UNIXTIME,
|
||||
}
|
||||
elif ((LAST_COMMIT_UUID in DATAX_JOBDICT_COMMIT) and REG_COMMIT_DONE.search(line)):
|
||||
DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['stat'] = 'D'
|
||||
DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['done'] = parse_timestamp(line)
|
||||
# }}} #
|
||||
|
||||
# {{{ function result_analyse() #
|
||||
def result_analyse():
|
||||
def compare(a, b):
|
||||
return b['cost'] - a['cost']
|
||||
|
||||
tasklist = []
|
||||
hostsmap = {}
|
||||
statvars = {'sum' : 0, 'cnt' : 0, 'svr' : 0, 'max' : 0, 'min' : int(time.time())}
|
||||
tasklist_commit = []
|
||||
statvars_commit = {'sum' : 0, 'cnt' : 0}
|
||||
|
||||
for idx in DATAX_JOBDICT:
|
||||
item = DATAX_JOBDICT[idx]
|
||||
item['uuid'] = idx;
|
||||
item['cost'] = item['done'] - item['wake']
|
||||
tasklist.append(item);
|
||||
|
||||
if (not (item['host'] in hostsmap)):
|
||||
hostsmap[item['host']] = 1
|
||||
statvars['svr'] += 1
|
||||
|
||||
if (item['cost'] > -1 and item['cost'] < 864000):
|
||||
statvars['sum'] += item['cost']
|
||||
statvars['cnt'] += 1
|
||||
statvars['max'] = max(statvars['max'], item['done'])
|
||||
statvars['min'] = min(statvars['min'], item['wake'])
|
||||
|
||||
for idx in DATAX_JOBDICT_COMMIT:
|
||||
itemc = DATAX_JOBDICT_COMMIT[idx]
|
||||
itemc['uuid'] = idx
|
||||
itemc['cost'] = itemc['done'] - itemc['wake']
|
||||
tasklist_commit.append(itemc)
|
||||
|
||||
if (itemc['cost'] > -1 and itemc['cost'] < 864000):
|
||||
statvars_commit['sum'] += itemc['cost']
|
||||
statvars_commit['cnt'] += 1
|
||||
|
||||
ttl = (statvars['max'] - statvars['min']) or 1
|
||||
idx = float(statvars['cnt']) / (statvars['sum'] or ttl)
|
||||
|
||||
tasklist.sort(compare)
|
||||
for item in tasklist:
|
||||
print('%s\t%s.%s\t%s\t%s\t% 4d\t% 2.1f%%\t% .2f' %(item['stat'], item['host'], item['path'],
|
||||
time.strftime('%H:%M:%S', time.localtime(item['wake'])),
|
||||
(('D' == item['stat']) and time.strftime('%H:%M:%S', time.localtime(item['done']))) or '--',
|
||||
item['cost'], 100 * item['cost'] / ttl, idx * item['cost']))
|
||||
|
||||
if (not len(tasklist) or not statvars['cnt']):
|
||||
return
|
||||
|
||||
print('\n--- DataX Profiling Statistics ---')
|
||||
print('%d task(s) on %d server(s), Total elapsed %d second(s), %.2f second(s) per task in average' %(statvars['cnt'],
|
||||
statvars['svr'], statvars['sum'], float(statvars['sum']) / statvars['cnt']))
|
||||
print('Actually cost %d second(s) (%s - %s), task concurrency: %.2f, tilt index: %.2f' %(ttl,
|
||||
time.strftime('%H:%M:%S', time.localtime(statvars['min'])),
|
||||
time.strftime('%H:%M:%S', time.localtime(statvars['max'])),
|
||||
float(statvars['sum']) / ttl, idx * tasklist[0]['cost']))
|
||||
|
||||
idx_commit = float(statvars_commit['cnt']) / (statvars_commit['sum'] or ttl)
|
||||
tasklist_commit.sort(compare)
|
||||
print('%d task(s) done odps comit, Total elapsed %d second(s), %.2f second(s) per task in average, tilt index: %.2f' % (
|
||||
statvars_commit['cnt'],
|
||||
statvars_commit['sum'], float(statvars_commit['sum']) / statvars_commit['cnt'],
|
||||
idx_commit * tasklist_commit[0]['cost']))
|
||||
|
||||
# }}} #
|
||||
|
||||
if (len(sys.argv) < 2):
|
||||
print("Usage: %s filename" %(sys.argv[0]))
|
||||
quit(1)
|
||||
else:
|
||||
parse_task(sys.argv[1])
|
||||
result_analyse()
|
||||
|
Loading…
Reference in New Issue
Block a user