mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 04:40:54 +08:00
Merge c594154df5
into 0824b45c5e
This commit is contained in:
commit
1367a179a5
227
core/src/main/bin/datax_py3.py
Normal file
227
core/src/main/bin/datax_py3.py
Normal file
@ -0,0 +1,227 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding:utf-8 -*-
|
||||
|
||||
import sys
|
||||
import os
|
||||
import signal
|
||||
import subprocess
|
||||
import time
|
||||
import re
|
||||
import socket
|
||||
import json
|
||||
from optparse import OptionParser
|
||||
from optparse import OptionGroup
|
||||
from string import Template
|
||||
import codecs
|
||||
import platform
|
||||
|
||||
def isWindows():
|
||||
return platform.system() == 'Windows'
|
||||
|
||||
DATAX_HOME = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
|
||||
DATAX_VERSION = 'DATAX-OPENSOURCE-3.0'
|
||||
if isWindows():
|
||||
codecs.register(lambda name: name == 'cp65001' and codecs.lookup('utf-8') or None)
|
||||
CLASS_PATH = ("%s/lib/*") % (DATAX_HOME)
|
||||
else:
|
||||
CLASS_PATH = ("%s/lib/*:.") % (DATAX_HOME)
|
||||
LOGBACK_FILE = ("%s/conf/logback.xml") % (DATAX_HOME)
|
||||
DEFAULT_JVM = "-Xms1g -Xmx1g -XX:+HeapDumpOnOutOfMemoryError -XX:HeapDumpPath=%s/log" % (DATAX_HOME)
|
||||
DEFAULT_PROPERTY_CONF = "-Dfile.encoding=UTF-8 -Dlogback.statusListenerClass=ch.qos.logback.core.status.NopStatusListener -Djava.security.egd=file:///dev/urandom -Ddatax.home=%s -Dlogback.configurationFile=%s" % (
|
||||
DATAX_HOME, LOGBACK_FILE)
|
||||
ENGINE_COMMAND = "java -server ${jvm} %s -classpath %s ${params} com.alibaba.datax.core.Engine -mode ${mode} -jobid ${jobid} -job ${job}" % (
|
||||
DEFAULT_PROPERTY_CONF, CLASS_PATH)
|
||||
REMOTE_DEBUG_CONFIG = "-Xdebug -Xrunjdwp:transport=dt_socket,server=y,address=9999"
|
||||
|
||||
RET_STATE = {
|
||||
"KILL": 143,
|
||||
"FAIL": -1,
|
||||
"OK": 0,
|
||||
"RUN": 1,
|
||||
"RETRY": 2
|
||||
}
|
||||
|
||||
|
||||
def getLocalIp():
|
||||
try:
|
||||
return socket.gethostbyname(socket.getfqdn(socket.gethostname()))
|
||||
except:
|
||||
return "Unknown"
|
||||
|
||||
|
||||
def suicide(signum, e):
|
||||
global child_process
|
||||
print("[Error] DataX receive unexpected signal %d, starts to suicide." % (signum), file=sys.stderr)
|
||||
|
||||
if child_process:
|
||||
child_process.send_signal(signal.SIGQUIT)
|
||||
time.sleep(1)
|
||||
child_process.kill()
|
||||
print("DataX Process was killed ! you did ?", file=sys.stderr)
|
||||
sys.exit(RET_STATE["KILL"])
|
||||
|
||||
|
||||
def register_signal():
|
||||
if not isWindows():
|
||||
global child_process
|
||||
signal.signal(2, suicide)
|
||||
signal.signal(3, suicide)
|
||||
signal.signal(15, suicide)
|
||||
|
||||
|
||||
def getOptionParser():
|
||||
usage = "usage: %prog [options] job-url-or-path"
|
||||
parser = OptionParser(usage=usage)
|
||||
|
||||
prodEnvOptionGroup = OptionGroup(parser, "Product Env Options",
|
||||
"Normal user use these options to set jvm parameters, job runtime mode etc. "
|
||||
"Make sure these options can be used in Product Env.")
|
||||
prodEnvOptionGroup.add_option("-j", "--jvm", metavar="<jvm parameters>", dest="jvmParameters", action="store",
|
||||
default=DEFAULT_JVM, help="Set jvm parameters if necessary.")
|
||||
prodEnvOptionGroup.add_option("--jobid", metavar="<job unique id>", dest="jobid", action="store", default="-1",
|
||||
help="Set job unique id when running by Distribute/Local Mode.")
|
||||
prodEnvOptionGroup.add_option("-m", "--mode", metavar="<job runtime mode>",
|
||||
action="store", default="standalone",
|
||||
help="Set job runtime mode such as: standalone, local, distribute. "
|
||||
"Default mode is standalone.")
|
||||
prodEnvOptionGroup.add_option("-p", "--params", metavar="<parameter used in job config>",
|
||||
action="store", dest="params",
|
||||
help='Set job parameter, eg: the source tableName you want to set it by command, '
|
||||
'then you can use like this: -p"-DtableName=your-table-name", '
|
||||
'if you have mutiple parameters: -p"-DtableName=your-table-name -DcolumnName=your-column-name".'
|
||||
'Note: you should config in you job tableName with ${tableName}.')
|
||||
prodEnvOptionGroup.add_option("-r", "--reader", metavar="<parameter used in view job config[reader] template>",
|
||||
action="store", dest="reader",type="string",
|
||||
help='View job config[reader] template, eg: mysqlreader,streamreader')
|
||||
prodEnvOptionGroup.add_option("-w", "--writer", metavar="<parameter used in view job config[writer] template>",
|
||||
action="store", dest="writer",type="string",
|
||||
help='View job config[writer] template, eg: mysqlwriter,streamwriter')
|
||||
parser.add_option_group(prodEnvOptionGroup)
|
||||
|
||||
devEnvOptionGroup = OptionGroup(parser, "Develop/Debug Options",
|
||||
"Developer use these options to trace more details of DataX.")
|
||||
devEnvOptionGroup.add_option("-d", "--debug", dest="remoteDebug", action="store_true",
|
||||
help="Set to remote debug mode.")
|
||||
devEnvOptionGroup.add_option("--loglevel", metavar="<log level>", dest="loglevel", action="store",
|
||||
default="info", help="Set log level such as: debug, info, all etc.")
|
||||
parser.add_option_group(devEnvOptionGroup)
|
||||
return parser
|
||||
|
||||
def generateJobConfigTemplate(reader, writer):
|
||||
readerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n" % (reader,reader,reader)
|
||||
writerRef = "Please refer to the %s document:\n https://github.com/alibaba/DataX/blob/master/%s/doc/%s.md \n " % (writer,writer,writer)
|
||||
print(readerRef)
|
||||
print(writerRef)
|
||||
jobGuid = 'Please save the following configuration as a json file and use\n python {DATAX_HOME}/bin/datax.py {JSON_FILE_NAME}.json \nto run the job.\n'
|
||||
print(jobGuid)
|
||||
jobTemplate={
|
||||
"job": {
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": ""
|
||||
}
|
||||
},
|
||||
"content": [
|
||||
{
|
||||
"reader": {},
|
||||
"writer": {}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
readerTemplatePath = "%s/plugin/reader/%s/plugin_job_template.json" % (DATAX_HOME,reader)
|
||||
writerTemplatePath = "%s/plugin/writer/%s/plugin_job_template.json" % (DATAX_HOME,writer)
|
||||
try:
|
||||
readerPar = readPluginTemplate(readerTemplatePath);
|
||||
except Exception as e:
|
||||
print("Read reader[%s] template error: can\'t find file %s" % (reader,readerTemplatePath))
|
||||
try:
|
||||
writerPar = readPluginTemplate(writerTemplatePath);
|
||||
except Exception as e:
|
||||
print("Read writer[%s] template error: : can\'t find file %s" % (writer,writerTemplatePath))
|
||||
jobTemplate['job']['content'][0]['reader'] = readerPar;
|
||||
jobTemplate['job']['content'][0]['writer'] = writerPar;
|
||||
print(json.dumps(jobTemplate, indent=4, sort_keys=True))
|
||||
|
||||
def readPluginTemplate(plugin):
|
||||
with open(plugin, 'r') as f:
|
||||
return json.load(f)
|
||||
|
||||
def isUrl(path):
|
||||
if not path:
|
||||
return False
|
||||
|
||||
assert (isinstance(path, str))
|
||||
m = re.match(r"^http[s]?://\S+\w*", path.lower())
|
||||
if m:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def buildStartCommand(options, args):
|
||||
commandMap = {}
|
||||
tempJVMCommand = DEFAULT_JVM
|
||||
if options.jvmParameters:
|
||||
tempJVMCommand = tempJVMCommand + " " + options.jvmParameters
|
||||
|
||||
if options.remoteDebug:
|
||||
tempJVMCommand = tempJVMCommand + " " + REMOTE_DEBUG_CONFIG
|
||||
print('local ip: ', getLocalIp())
|
||||
|
||||
if options.loglevel:
|
||||
tempJVMCommand = tempJVMCommand + " " + ("-Dloglevel=%s" % (options.loglevel))
|
||||
|
||||
if options.mode:
|
||||
commandMap["mode"] = options.mode
|
||||
|
||||
# jobResource 可能是 URL,也可能是本地文件路径(相对,绝对)
|
||||
jobResource = args[0]
|
||||
if not isUrl(jobResource):
|
||||
jobResource = os.path.abspath(jobResource)
|
||||
if jobResource.lower().startswith("file://"):
|
||||
jobResource = jobResource[len("file://"):]
|
||||
|
||||
jobParams = ("-Dlog.file.name=%s") % (jobResource[-20:].replace('/', '_').replace('.', '_'))
|
||||
if options.params:
|
||||
jobParams = jobParams + " " + options.params
|
||||
|
||||
if options.jobid:
|
||||
commandMap["jobid"] = options.jobid
|
||||
|
||||
commandMap["jvm"] = tempJVMCommand
|
||||
commandMap["params"] = jobParams
|
||||
commandMap["job"] = jobResource
|
||||
|
||||
return Template(ENGINE_COMMAND).substitute(**commandMap)
|
||||
|
||||
|
||||
def printCopyright():
|
||||
print('''
|
||||
DataX (%s), From Alibaba !
|
||||
Copyright (C) 2010-2017, Alibaba Group. All Rights Reserved.
|
||||
|
||||
''' % DATAX_VERSION)
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
printCopyright()
|
||||
parser = getOptionParser()
|
||||
options, args = parser.parse_args(sys.argv[1:])
|
||||
if options.reader is not None and options.writer is not None:
|
||||
generateJobConfigTemplate(options.reader,options.writer)
|
||||
sys.exit(RET_STATE['OK'])
|
||||
if len(args) != 1:
|
||||
parser.print_help()
|
||||
sys.exit(RET_STATE['FAIL'])
|
||||
|
||||
startCommand = buildStartCommand(options, args)
|
||||
# print startCommand
|
||||
|
||||
child_process = subprocess.Popen(startCommand, shell=True)
|
||||
register_signal()
|
||||
(stdout, stderr) = child_process.communicate()
|
||||
|
||||
sys.exit(child_process.returncode)
|
192
core/src/main/bin/dxprof_py3.py
Normal file
192
core/src/main/bin/dxprof_py3.py
Normal file
@ -0,0 +1,192 @@
|
||||
#! /usr/bin/env python
|
||||
# vim: set expandtab tabstop=4 shiftwidth=4 foldmethod=marker nu:
|
||||
|
||||
import re
|
||||
import sys
|
||||
import time
|
||||
|
||||
REG_SQL_WAKE = re.compile(r'Begin\s+to\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
|
||||
REG_SQL_DONE = re.compile(r'Finished\s+read\s+record\s+by\s+Sql', re.IGNORECASE)
|
||||
REG_SQL_PATH = re.compile(r'from\s+(\w+)(\s+where|\s*$)', re.IGNORECASE)
|
||||
REG_SQL_JDBC = re.compile(r'jdbcUrl:\s*\[(.+?)\]', re.IGNORECASE)
|
||||
REG_SQL_UUID = re.compile(r'(\d+\-)+reader')
|
||||
REG_COMMIT_UUID = re.compile(r'(\d+\-)+writer')
|
||||
REG_COMMIT_WAKE = re.compile(r'begin\s+to\s+commit\s+blocks', re.IGNORECASE)
|
||||
REG_COMMIT_DONE = re.compile(r'commit\s+blocks\s+ok', re.IGNORECASE)
|
||||
|
||||
# {{{ function parse_timestamp() #
|
||||
def parse_timestamp(line):
|
||||
try:
|
||||
ts = int(time.mktime(time.strptime(line[0:19], '%Y-%m-%d %H:%M:%S')))
|
||||
except:
|
||||
ts = 0
|
||||
|
||||
return ts
|
||||
|
||||
# }}} #
|
||||
|
||||
# {{{ function parse_query_host() #
|
||||
def parse_query_host(line):
|
||||
ori = REG_SQL_JDBC.search(line)
|
||||
if (not ori):
|
||||
return ''
|
||||
|
||||
ori = ori.group(1).split('?')[0]
|
||||
off = ori.find('@')
|
||||
if (off > -1):
|
||||
ori = ori[off+1:len(ori)]
|
||||
else:
|
||||
off = ori.find('//')
|
||||
if (off > -1):
|
||||
ori = ori[off+2:len(ori)]
|
||||
|
||||
return ori.lower()
|
||||
# }}} #
|
||||
|
||||
# {{{ function parse_query_table() #
|
||||
def parse_query_table(line):
|
||||
ori = REG_SQL_PATH.search(line)
|
||||
return (ori and ori.group(1).lower()) or ''
|
||||
# }}} #
|
||||
|
||||
# {{{ function parse_reader_task() #
|
||||
def parse_task(fname):
|
||||
global LAST_SQL_UUID
|
||||
global LAST_COMMIT_UUID
|
||||
global DATAX_JOBDICT
|
||||
global DATAX_JOBDICT_COMMIT
|
||||
global UNIXTIME
|
||||
LAST_SQL_UUID = ''
|
||||
DATAX_JOBDICT = {}
|
||||
LAST_COMMIT_UUID = ''
|
||||
DATAX_JOBDICT_COMMIT = {}
|
||||
|
||||
UNIXTIME = int(time.time())
|
||||
with open(fname, 'r') as f:
|
||||
for line in f.readlines():
|
||||
line = line.strip()
|
||||
|
||||
if (LAST_SQL_UUID and (LAST_SQL_UUID in DATAX_JOBDICT)):
|
||||
DATAX_JOBDICT[LAST_SQL_UUID]['host'] = parse_query_host(line)
|
||||
LAST_SQL_UUID = ''
|
||||
|
||||
if line.find('CommonRdbmsReader$Task') > 0:
|
||||
parse_read_task(line)
|
||||
elif line.find('commit blocks') > 0:
|
||||
parse_write_task(line)
|
||||
else:
|
||||
continue
|
||||
# }}} #
|
||||
|
||||
# {{{ function parse_read_task() #
|
||||
def parse_read_task(line):
|
||||
ser = REG_SQL_UUID.search(line)
|
||||
if not ser:
|
||||
return
|
||||
|
||||
LAST_SQL_UUID = ser.group()
|
||||
if REG_SQL_WAKE.search(line):
|
||||
DATAX_JOBDICT[LAST_SQL_UUID] = {
|
||||
'stat' : 'R',
|
||||
'wake' : parse_timestamp(line),
|
||||
'done' : UNIXTIME,
|
||||
'host' : parse_query_host(line),
|
||||
'path' : parse_query_table(line)
|
||||
}
|
||||
elif ((LAST_SQL_UUID in DATAX_JOBDICT) and REG_SQL_DONE.search(line)):
|
||||
DATAX_JOBDICT[LAST_SQL_UUID]['stat'] = 'D'
|
||||
DATAX_JOBDICT[LAST_SQL_UUID]['done'] = parse_timestamp(line)
|
||||
# }}} #
|
||||
|
||||
# {{{ function parse_write_task() #
|
||||
def parse_write_task(line):
|
||||
ser = REG_COMMIT_UUID.search(line)
|
||||
if not ser:
|
||||
return
|
||||
|
||||
LAST_COMMIT_UUID = ser.group()
|
||||
if REG_COMMIT_WAKE.search(line):
|
||||
DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID] = {
|
||||
'stat' : 'R',
|
||||
'wake' : parse_timestamp(line),
|
||||
'done' : UNIXTIME,
|
||||
}
|
||||
elif ((LAST_COMMIT_UUID in DATAX_JOBDICT_COMMIT) and REG_COMMIT_DONE.search(line)):
|
||||
DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['stat'] = 'D'
|
||||
DATAX_JOBDICT_COMMIT[LAST_COMMIT_UUID]['done'] = parse_timestamp(line)
|
||||
# }}} #
|
||||
|
||||
# {{{ function result_analyse() #
|
||||
def result_analyse():
|
||||
def compare(a, b):
|
||||
return b['cost'] - a['cost']
|
||||
|
||||
tasklist = []
|
||||
hostsmap = {}
|
||||
statvars = {'sum' : 0, 'cnt' : 0, 'svr' : 0, 'max' : 0, 'min' : int(time.time())}
|
||||
tasklist_commit = []
|
||||
statvars_commit = {'sum' : 0, 'cnt' : 0}
|
||||
|
||||
for idx in DATAX_JOBDICT:
|
||||
item = DATAX_JOBDICT[idx]
|
||||
item['uuid'] = idx;
|
||||
item['cost'] = item['done'] - item['wake']
|
||||
tasklist.append(item);
|
||||
|
||||
if (not (item['host'] in hostsmap)):
|
||||
hostsmap[item['host']] = 1
|
||||
statvars['svr'] += 1
|
||||
|
||||
if (item['cost'] > -1 and item['cost'] < 864000):
|
||||
statvars['sum'] += item['cost']
|
||||
statvars['cnt'] += 1
|
||||
statvars['max'] = max(statvars['max'], item['done'])
|
||||
statvars['min'] = min(statvars['min'], item['wake'])
|
||||
|
||||
for idx in DATAX_JOBDICT_COMMIT:
|
||||
itemc = DATAX_JOBDICT_COMMIT[idx]
|
||||
itemc['uuid'] = idx
|
||||
itemc['cost'] = itemc['done'] - itemc['wake']
|
||||
tasklist_commit.append(itemc)
|
||||
|
||||
if (itemc['cost'] > -1 and itemc['cost'] < 864000):
|
||||
statvars_commit['sum'] += itemc['cost']
|
||||
statvars_commit['cnt'] += 1
|
||||
|
||||
ttl = (statvars['max'] - statvars['min']) or 1
|
||||
idx = float(statvars['cnt']) / (statvars['sum'] or ttl)
|
||||
|
||||
tasklist.sort(compare)
|
||||
for item in tasklist:
|
||||
print('%s\t%s.%s\t%s\t%s\t% 4d\t% 2.1f%%\t% .2f' %(item['stat'], item['host'], item['path'],
|
||||
time.strftime('%H:%M:%S', time.localtime(item['wake'])),
|
||||
(('D' == item['stat']) and time.strftime('%H:%M:%S', time.localtime(item['done']))) or '--',
|
||||
item['cost'], 100 * item['cost'] / ttl, idx * item['cost']))
|
||||
|
||||
if (not len(tasklist) or not statvars['cnt']):
|
||||
return
|
||||
|
||||
print('\n--- DataX Profiling Statistics ---')
|
||||
print('%d task(s) on %d server(s), Total elapsed %d second(s), %.2f second(s) per task in average' %(statvars['cnt'],
|
||||
statvars['svr'], statvars['sum'], float(statvars['sum']) / statvars['cnt']))
|
||||
print('Actually cost %d second(s) (%s - %s), task concurrency: %.2f, tilt index: %.2f' %(ttl,
|
||||
time.strftime('%H:%M:%S', time.localtime(statvars['min'])),
|
||||
time.strftime('%H:%M:%S', time.localtime(statvars['max'])),
|
||||
float(statvars['sum']) / ttl, idx * tasklist[0]['cost']))
|
||||
|
||||
idx_commit = float(statvars_commit['cnt']) / (statvars_commit['sum'] or ttl)
|
||||
tasklist_commit.sort(compare)
|
||||
print('%d task(s) done odps comit, Total elapsed %d second(s), %.2f second(s) per task in average, tilt index: %.2f' % (
|
||||
statvars_commit['cnt'],
|
||||
statvars_commit['sum'], float(statvars_commit['sum']) / statvars_commit['cnt'],
|
||||
idx_commit * tasklist_commit[0]['cost']))
|
||||
|
||||
# }}} #
|
||||
|
||||
if (len(sys.argv) < 2):
|
||||
print("Usage: %s filename" %(sys.argv[0]))
|
||||
quit(1)
|
||||
else:
|
||||
parse_task(sys.argv[1])
|
||||
result_analyse()
|
||||
|
401
core/src/main/bin/perftrace_py3.py
Normal file
401
core/src/main/bin/perftrace_py3.py
Normal file
@ -0,0 +1,401 @@
|
||||
#!/usr/bin/env python
|
||||
# -*- coding:utf-8 -*-
|
||||
|
||||
|
||||
"""
|
||||
Life's short, Python more.
|
||||
"""
|
||||
|
||||
import re
|
||||
import os
|
||||
import sys
|
||||
import json
|
||||
import uuid
|
||||
import signal
|
||||
import time
|
||||
import subprocess
|
||||
from optparse import OptionParser
|
||||
import importlib
|
||||
importlib.reload(sys)
|
||||
sys.setdefaultencoding('utf8')
|
||||
|
||||
##begin cli & help logic
|
||||
def getOptionParser():
|
||||
usage = getUsage()
|
||||
parser = OptionParser(usage = usage)
|
||||
#rdbms reader and writer
|
||||
parser.add_option('-r', '--reader', action='store', dest='reader', help='trace datasource read performance with specified !json! string')
|
||||
parser.add_option('-w', '--writer', action='store', dest='writer', help='trace datasource write performance with specified !json! string')
|
||||
|
||||
parser.add_option('-c', '--channel', action='store', dest='channel', default='1', help='the number of concurrent sync thread, the default is 1')
|
||||
parser.add_option('-f', '--file', action='store', help='existing datax configuration file, include reader and writer params')
|
||||
parser.add_option('-t', '--type', action='store', default='reader', help='trace which side\'s performance, cooperate with -f --file params, need to be reader or writer')
|
||||
parser.add_option('-d', '--delete', action='store', default='true', help='delete temporary files, the default value is true')
|
||||
#parser.add_option('-h', '--help', action='store', default='true', help='print usage information')
|
||||
return parser
|
||||
|
||||
def getUsage():
|
||||
return '''
|
||||
The following params are available for -r --reader:
|
||||
[these params is for rdbms reader, used to trace rdbms read performance, it's like datax's key]
|
||||
*datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2 etc...
|
||||
*jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database
|
||||
*username: username for datasource
|
||||
*password: password for datasource
|
||||
*table: table name for read data
|
||||
column: column to be read, the default value is ['*']
|
||||
splitPk: the splitPk column of rdbms table
|
||||
where: limit the scope of the performance data set
|
||||
fetchSize: how many rows to be fetched at each communicate
|
||||
|
||||
[these params is for stream reader, used to trace rdbms write performance]
|
||||
reader-sliceRecordCount: how man test data to mock(each channel), the default value is 10000
|
||||
reader-column : stream reader while generate test data(type supports: string|long|date|double|bool|bytes; support constant value and random function),demo: [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]
|
||||
|
||||
The following params are available for -w --writer:
|
||||
[these params is for rdbms writer, used to trace rdbms write performance, it's like datax's key]
|
||||
datasourceType: datasource type, may be mysql|drds|oracle|ads|sqlserver|postgresql|db2|ads etc...
|
||||
*jdbcUrl: datasource jdbc connection string, mysql as a example: jdbc:mysql://ip:port/database
|
||||
*username: username for datasource
|
||||
*password: password for datasource
|
||||
*table: table name for write data
|
||||
column: column to be writed, the default value is ['*']
|
||||
batchSize: how many rows to be storeed at each communicate, the default value is 512
|
||||
preSql: prepare sql to be executed before write data, the default value is ''
|
||||
postSql: post sql to be executed end of write data, the default value is ''
|
||||
url: required for ads, pattern is ip:port
|
||||
schme: required for ads, ads database name
|
||||
|
||||
[these params is for stream writer, used to trace rdbms read performance]
|
||||
writer-print: true means print data read from source datasource, the default value is false
|
||||
|
||||
The following params are available global control:
|
||||
-c --channel: the number of concurrent tasks, the default value is 1
|
||||
-f --file: existing completely dataX configuration file path
|
||||
-t --type: test read or write performance for a datasource, couble be reader or writer, in collaboration with -f --file
|
||||
-h --help: print help message
|
||||
|
||||
some demo:
|
||||
perftrace.py --channel=10 --reader='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "where":"", "splitPk":"", "writer-print":"false"}'
|
||||
perftrace.py --channel=10 --writer='{"jdbcUrl":"jdbc:mysql://127.0.0.1:3306/database", "username":"", "password":"", "table": "", "reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
|
||||
perftrace.py --file=/tmp/datax.job.json --type=reader --reader='{"writer-print": "false"}'
|
||||
perftrace.py --file=/tmp/datax.job.json --type=writer --writer='{"reader-sliceRecordCount": "10000", "reader-column": [{"type":"string","value":"abc"},{"type":"string","random":"10,20"}]}'
|
||||
|
||||
some example jdbc url pattern, may help:
|
||||
jdbc:oracle:thin:@ip:port:database
|
||||
jdbc:mysql://ip:port/database
|
||||
jdbc:sqlserver://ip:port;DatabaseName=database
|
||||
jdbc:postgresql://ip:port/database
|
||||
warn: ads url pattern is ip:port
|
||||
warn: test write performance will write data into your table, you can use a temporary table just for test.
|
||||
'''
|
||||
|
||||
def printCopyright():
|
||||
DATAX_VERSION = 'UNKNOWN_DATAX_VERSION'
|
||||
print('''
|
||||
DataX Util Tools (%s), From Alibaba !
|
||||
Copyright (C) 2010-2016, Alibaba Group. All Rights Reserved.''' % DATAX_VERSION)
|
||||
sys.stdout.flush()
|
||||
|
||||
|
||||
def yesNoChoice():
|
||||
yes = set(['yes','y', 'ye', ''])
|
||||
no = set(['no','n'])
|
||||
choice = input().lower()
|
||||
if choice in yes:
|
||||
return True
|
||||
elif choice in no:
|
||||
return False
|
||||
else:
|
||||
sys.stdout.write("Please respond with 'yes' or 'no'")
|
||||
##end cli & help logic
|
||||
|
||||
|
||||
##begin process logic
|
||||
def suicide(signum, e):
|
||||
global childProcess
|
||||
print("[Error] Receive unexpected signal %d, starts to suicide." % (signum), file=sys.stderr)
|
||||
if childProcess:
|
||||
childProcess.send_signal(signal.SIGQUIT)
|
||||
time.sleep(1)
|
||||
childProcess.kill()
|
||||
print("DataX Process was killed ! you did ?", file=sys.stderr)
|
||||
sys.exit(-1)
|
||||
|
||||
|
||||
def registerSignal():
|
||||
global childProcess
|
||||
signal.signal(2, suicide)
|
||||
signal.signal(3, suicide)
|
||||
signal.signal(15, suicide)
|
||||
|
||||
|
||||
def fork(command, isShell=False):
|
||||
global childProcess
|
||||
childProcess = subprocess.Popen(command, shell = isShell)
|
||||
registerSignal()
|
||||
(stdout, stderr) = childProcess.communicate()
|
||||
#阻塞直到子进程结束
|
||||
childProcess.wait()
|
||||
return childProcess.returncode
|
||||
##end process logic
|
||||
|
||||
|
||||
##begin datax json generate logic
|
||||
#warn: if not '': -> true; if not None: -> true
|
||||
def notNone(obj, context):
|
||||
if not obj:
|
||||
raise Exception("Configuration property [%s] could not be blank!" % (context))
|
||||
|
||||
def attributeNotNone(obj, attributes):
|
||||
for key in attributes:
|
||||
notNone(obj.get(key), key)
|
||||
|
||||
def isBlank(value):
|
||||
if value is None or len(value.strip()) == 0:
|
||||
return True
|
||||
return False
|
||||
|
||||
def parsePluginName(jdbcUrl, pluginType):
|
||||
import re
|
||||
#warn: drds
|
||||
name = 'pluginName'
|
||||
mysqlRegex = re.compile('jdbc:(mysql)://.*')
|
||||
if (mysqlRegex.match(jdbcUrl)):
|
||||
name = 'mysql'
|
||||
postgresqlRegex = re.compile('jdbc:(postgresql)://.*')
|
||||
if (postgresqlRegex.match(jdbcUrl)):
|
||||
name = 'postgresql'
|
||||
oracleRegex = re.compile('jdbc:(oracle):.*')
|
||||
if (oracleRegex.match(jdbcUrl)):
|
||||
name = 'oracle'
|
||||
sqlserverRegex = re.compile('jdbc:(sqlserver)://.*')
|
||||
if (sqlserverRegex.match(jdbcUrl)):
|
||||
name = 'sqlserver'
|
||||
db2Regex = re.compile('jdbc:(db2)://.*')
|
||||
if (db2Regex.match(jdbcUrl)):
|
||||
name = 'db2'
|
||||
return "%s%s" % (name, pluginType)
|
||||
|
||||
def renderDataXJson(paramsDict, readerOrWriter = 'reader', channel = 1):
|
||||
dataxTemplate = {
|
||||
"job": {
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 1
|
||||
}
|
||||
},
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "",
|
||||
"parameter": {
|
||||
"username": "",
|
||||
"password": "",
|
||||
"sliceRecordCount": "10000",
|
||||
"column": [
|
||||
"*"
|
||||
],
|
||||
"connection": [
|
||||
{
|
||||
"table": [],
|
||||
"jdbcUrl": []
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "",
|
||||
"parameter": {
|
||||
"print": "false",
|
||||
"connection": [
|
||||
{
|
||||
"table": [],
|
||||
"jdbcUrl": ''
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
dataxTemplate['job']['setting']['speed']['channel'] = channel
|
||||
dataxTemplateContent = dataxTemplate['job']['content'][0]
|
||||
|
||||
pluginName = ''
|
||||
if paramsDict.get('datasourceType'):
|
||||
pluginName = '%s%s' % (paramsDict['datasourceType'], readerOrWriter)
|
||||
elif paramsDict.get('jdbcUrl'):
|
||||
pluginName = parsePluginName(paramsDict['jdbcUrl'], readerOrWriter)
|
||||
elif paramsDict.get('url'):
|
||||
pluginName = 'adswriter'
|
||||
|
||||
theOtherSide = 'writer' if readerOrWriter == 'reader' else 'reader'
|
||||
dataxPluginParamsContent = dataxTemplateContent.get(readerOrWriter).get('parameter')
|
||||
dataxPluginParamsContent.update(paramsDict)
|
||||
|
||||
dataxPluginParamsContentOtherSide = dataxTemplateContent.get(theOtherSide).get('parameter')
|
||||
|
||||
if readerOrWriter == 'reader':
|
||||
dataxTemplateContent.get('reader')['name'] = pluginName
|
||||
dataxTemplateContent.get('writer')['name'] = 'streamwriter'
|
||||
if paramsDict.get('writer-print'):
|
||||
dataxPluginParamsContentOtherSide['print'] = paramsDict['writer-print']
|
||||
del dataxPluginParamsContent['writer-print']
|
||||
del dataxPluginParamsContentOtherSide['connection']
|
||||
if readerOrWriter == 'writer':
|
||||
dataxTemplateContent.get('reader')['name'] = 'streamreader'
|
||||
dataxTemplateContent.get('writer')['name'] = pluginName
|
||||
if paramsDict.get('reader-column'):
|
||||
dataxPluginParamsContentOtherSide['column'] = paramsDict['reader-column']
|
||||
del dataxPluginParamsContent['reader-column']
|
||||
if paramsDict.get('reader-sliceRecordCount'):
|
||||
dataxPluginParamsContentOtherSide['sliceRecordCount'] = paramsDict['reader-sliceRecordCount']
|
||||
del dataxPluginParamsContent['reader-sliceRecordCount']
|
||||
del dataxPluginParamsContentOtherSide['connection']
|
||||
|
||||
if paramsDict.get('jdbcUrl'):
|
||||
if readerOrWriter == 'reader':
|
||||
dataxPluginParamsContent['connection'][0]['jdbcUrl'].append(paramsDict['jdbcUrl'])
|
||||
else:
|
||||
dataxPluginParamsContent['connection'][0]['jdbcUrl'] = paramsDict['jdbcUrl']
|
||||
if paramsDict.get('table'):
|
||||
dataxPluginParamsContent['connection'][0]['table'].append(paramsDict['table'])
|
||||
|
||||
|
||||
traceJobJson = json.dumps(dataxTemplate, indent = 4)
|
||||
return traceJobJson
|
||||
|
||||
def isUrl(path):
|
||||
if not path:
|
||||
return False
|
||||
if not isinstance(path, str):
|
||||
raise Exception('Configuration file path required for the string, you configure is:%s' % path)
|
||||
m = re.match(r"^http[s]?://\S+\w*", path.lower())
|
||||
if m:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
def readJobJsonFromLocal(jobConfigPath):
|
||||
jobConfigContent = None
|
||||
jobConfigPath = os.path.abspath(jobConfigPath)
|
||||
file = open(jobConfigPath)
|
||||
try:
|
||||
jobConfigContent = file.read()
|
||||
finally:
|
||||
file.close()
|
||||
if not jobConfigContent:
|
||||
raise Exception("Your job configuration file read the result is empty, please check the configuration is legal, path: [%s]\nconfiguration:\n%s" % (jobConfigPath, str(jobConfigContent)))
|
||||
return jobConfigContent
|
||||
|
||||
|
||||
def readJobJsonFromRemote(jobConfigPath):
|
||||
import urllib.request, urllib.parse, urllib.error
|
||||
conn = urllib.request.urlopen(jobConfigPath)
|
||||
jobJson = conn.read()
|
||||
return jobJson
|
||||
|
||||
def parseJson(strConfig, context):
|
||||
try:
|
||||
return json.loads(strConfig)
|
||||
except Exception as e:
|
||||
import traceback
|
||||
traceback.print_exc()
|
||||
sys.stdout.flush()
|
||||
print('%s %s need in line with json syntax' % (context, strConfig), file=sys.stderr)
|
||||
sys.exit(-1)
|
||||
|
||||
def convert(options, args):
|
||||
traceJobJson = ''
|
||||
if options.file:
|
||||
if isUrl(options.file):
|
||||
traceJobJson = readJobJsonFromRemote(options.file)
|
||||
else:
|
||||
traceJobJson = readJobJsonFromLocal(options.file)
|
||||
traceJobDict = parseJson(traceJobJson, '%s content' % options.file)
|
||||
attributeNotNone(traceJobDict, ['job'])
|
||||
attributeNotNone(traceJobDict['job'], ['content'])
|
||||
attributeNotNone(traceJobDict['job']['content'][0], ['reader', 'writer'])
|
||||
attributeNotNone(traceJobDict['job']['content'][0]['reader'], ['name', 'parameter'])
|
||||
attributeNotNone(traceJobDict['job']['content'][0]['writer'], ['name', 'parameter'])
|
||||
if options.type == 'reader':
|
||||
traceJobDict['job']['content'][0]['writer']['name'] = 'streamwriter'
|
||||
if options.reader:
|
||||
traceReaderDict = parseJson(options.reader, 'reader config')
|
||||
if traceReaderDict.get('writer-print') is not None:
|
||||
traceJobDict['job']['content'][0]['writer']['parameter']['print'] = traceReaderDict.get('writer-print')
|
||||
else:
|
||||
traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
|
||||
else:
|
||||
traceJobDict['job']['content'][0]['writer']['parameter']['print'] = 'false'
|
||||
elif options.type == 'writer':
|
||||
traceJobDict['job']['content'][0]['reader']['name'] = 'streamreader'
|
||||
if options.writer:
|
||||
traceWriterDict = parseJson(options.writer, 'writer config')
|
||||
if traceWriterDict.get('reader-column'):
|
||||
traceJobDict['job']['content'][0]['reader']['parameter']['column'] = traceWriterDict['reader-column']
|
||||
if traceWriterDict.get('reader-sliceRecordCount'):
|
||||
traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = traceWriterDict['reader-sliceRecordCount']
|
||||
else:
|
||||
columnSize = len(traceJobDict['job']['content'][0]['writer']['parameter']['column'])
|
||||
streamReaderColumn = []
|
||||
for i in range(columnSize):
|
||||
streamReaderColumn.append({"type": "long", "random": "2,10"})
|
||||
traceJobDict['job']['content'][0]['reader']['parameter']['column'] = streamReaderColumn
|
||||
traceJobDict['job']['content'][0]['reader']['parameter']['sliceRecordCount'] = 10000
|
||||
else:
|
||||
pass#do nothing
|
||||
return json.dumps(traceJobDict, indent = 4)
|
||||
elif options.reader:
|
||||
traceReaderDict = parseJson(options.reader, 'reader config')
|
||||
return renderDataXJson(traceReaderDict, 'reader', options.channel)
|
||||
elif options.writer:
|
||||
traceWriterDict = parseJson(options.writer, 'writer config')
|
||||
return renderDataXJson(traceWriterDict, 'writer', options.channel)
|
||||
else:
|
||||
print(getUsage())
|
||||
sys.exit(-1)
|
||||
#dataxParams = {}
|
||||
#for opt, value in options.__dict__.items():
|
||||
# dataxParams[opt] = value
|
||||
##end datax json generate logic
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
printCopyright()
|
||||
parser = getOptionParser()
|
||||
|
||||
options, args = parser.parse_args(sys.argv[1:])
|
||||
#print options, args
|
||||
dataxTraceJobJson = convert(options, args)
|
||||
|
||||
#由MAC地址、当前时间戳、随机数生成,可以保证全球范围内的唯一性
|
||||
dataxJobPath = os.path.join(os.getcwd(), "perftrace-" + str(uuid.uuid1()))
|
||||
jobConfigOk = True
|
||||
if os.path.exists(dataxJobPath):
|
||||
print("file already exists, truncate and rewrite it? %s" % dataxJobPath)
|
||||
if yesNoChoice():
|
||||
jobConfigOk = True
|
||||
else:
|
||||
print("exit failed, because of file conflict")
|
||||
sys.exit(-1)
|
||||
fileWriter = open(dataxJobPath, 'w')
|
||||
fileWriter.write(dataxTraceJobJson)
|
||||
fileWriter.close()
|
||||
|
||||
|
||||
print("trace environments:")
|
||||
print("dataxJobPath: %s" % dataxJobPath)
|
||||
dataxHomePath = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
|
||||
print("dataxHomePath: %s" % dataxHomePath)
|
||||
|
||||
dataxCommand = "%s %s" % (os.path.join(dataxHomePath, "bin", "datax.py"), dataxJobPath)
|
||||
print("dataxCommand: %s" % dataxCommand)
|
||||
|
||||
returncode = fork(dataxCommand, True)
|
||||
if options.delete == 'true':
|
||||
os.remove(dataxJobPath)
|
||||
sys.exit(returncode)
|
Loading…
Reference in New Issue
Block a user