mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 09:00:52 +08:00
180 lines
6.5 KiB
Python
180 lines
6.5 KiB
Python
#!/bin/usr/env python
|
|
#-*- coding: utf-8 -*-
|
|
|
|
from optparse import OptionParser
|
|
import sys
|
|
import json
|
|
import tabulate
|
|
import zlib
|
|
from ots2 import *
|
|
|
|
class ConsoleConfig:
|
|
def __init__(self, config_file):
|
|
f = open(config_file, 'r')
|
|
config = json.loads(f.read())
|
|
self.endpoint = str(config['endpoint'])
|
|
self.accessid = str(config['accessId'])
|
|
self.accesskey = str(config['accessKey'])
|
|
self.instance_name = str(config['instanceName'])
|
|
self.status_table = str(config['statusTable'])
|
|
|
|
self.ots = OTSClient(self.endpoint, self.accessid, self.accesskey, self.instance_name)
|
|
|
|
def describe_job(config, options):
|
|
'''
|
|
1. get job's description
|
|
2. get all job's checkpoints and check if it is done
|
|
'''
|
|
if not options.stream_id:
|
|
print "Error: Should set the stream id using '-s' or '--streamid'."
|
|
sys.exit(-1)
|
|
|
|
if not options.timestamp:
|
|
print "Error: Should set the timestamp using '-t' or '--timestamp'."
|
|
sys.exit(-1)
|
|
|
|
pk = [('StreamId', options.stream_id), ('StatusType', 'DataxJobDesc'), ('StatusValue', '%16d' % int(options.timestamp))]
|
|
consumed, pk, attrs, next_token = config.ots.get_row(config.status_table, pk, [], None, 1)
|
|
if not attrs:
|
|
print 'Stream job is not found.'
|
|
sys.exit(-1)
|
|
|
|
job_detail = parse_job_detail(attrs)
|
|
print '----------JobDescriptions----------'
|
|
print json.dumps(job_detail, indent=2)
|
|
print '-----------------------------------'
|
|
|
|
stream_checkpoints = _list_checkpoints(config, options.stream_id, int(options.timestamp))
|
|
|
|
cps_headers = ['ShardId', 'SendRecordCount', 'Checkpoint', 'SkipCount', 'Version']
|
|
table_content = []
|
|
for cp in stream_checkpoints:
|
|
table_content.append([cp['ShardId'], cp['SendRecordCount'], cp['Checkpoint'], cp['SkipCount'], cp['Version']])
|
|
|
|
print tabulate.tabulate(table_content, headers=cps_headers)
|
|
|
|
# check if stream job has finished
|
|
finished = True
|
|
if len(job_detail['ShardIds']) != len(stream_checkpoints):
|
|
finished = False
|
|
|
|
for cp in stream_checkpoints:
|
|
if cp['Version'] != job_detail['Version']:
|
|
finished = False
|
|
|
|
print '----------JobSummary----------'
|
|
print 'ShardsCount:', len(job_detail['ShardIds'])
|
|
print 'CheckPointsCount:', len(stream_checkpoints)
|
|
print 'JobStatus:', 'Finished' if finished else 'NotFinished'
|
|
print '------------------------------'
|
|
|
|
def _list_checkpoints(config, stream_id, timestamp):
|
|
start_pk = [('StreamId', stream_id), ('StatusType', 'CheckpointForDataxReader'), ('StatusValue', '%16d' % timestamp)]
|
|
end_pk = [('StreamId', stream_id), ('StatusType', 'CheckpointForDataxReader'), ('StatusValue', '%16d' % (timestamp + 1))]
|
|
|
|
consumed_counter = CapacityUnit(0, 0)
|
|
columns_to_get = []
|
|
checkpoints = []
|
|
range_iter = config.ots.xget_range(
|
|
config.status_table, Direction.FORWARD,
|
|
start_pk, end_pk,
|
|
consumed_counter, columns_to_get, 100,
|
|
column_filter=None, max_version=1
|
|
)
|
|
|
|
rows = []
|
|
for (primary_key, attrs) in range_iter:
|
|
checkpoint = {}
|
|
for attr in attrs:
|
|
checkpoint[attr[0]] = attr[1]
|
|
|
|
if not checkpoint.has_key('SendRecordCount'):
|
|
checkpoint['SendRecordCount'] = 0
|
|
checkpoint['ShardId'] = primary_key[2][1].split('\t')[1]
|
|
checkpoints.append(checkpoint)
|
|
|
|
return checkpoints
|
|
|
|
def list_job(config, options):
|
|
'''
|
|
Two options:
|
|
1. list all jobs of stream
|
|
2. list all jobs and all streams
|
|
'''
|
|
consumed_counter = CapacityUnit(0, 0)
|
|
|
|
if options.stream_id:
|
|
start_pk = [('StreamId', options.stream_id), ('StatusType', INF_MIN), ('StatusValue', INF_MIN)]
|
|
end_pk = [('StreamId', options.stream_id), ('StatusType', INF_MAX), ('StatusValue', INF_MAX)]
|
|
else:
|
|
start_pk = [('StreamId', INF_MIN), ('StatusType', INF_MIN), ('StatusValue', INF_MIN)]
|
|
end_pk = [('StreamId', INF_MAX), ('StatusType', INF_MAX), ('StatusValue', INF_MAX)]
|
|
|
|
columns_to_get = []
|
|
range_iter = config.ots.xget_range(
|
|
config.status_table, Direction.FORWARD,
|
|
start_pk, end_pk,
|
|
consumed_counter, columns_to_get, None,
|
|
column_filter=None, max_version=1
|
|
)
|
|
|
|
rows = []
|
|
for (primary_key, attrs) in range_iter:
|
|
if primary_key[1][1] == 'DataxJobDesc':
|
|
job_detail = parse_job_detail(attrs)
|
|
rows.append([job_detail['TableName'], job_detail['JobStreamId'], job_detail['EndTime'], job_detail['StartTime'], job_detail['EndTime'], job_detail['Version']])
|
|
|
|
headers = ['TableName', 'JobStreamId', 'Timestamp', 'StartTime', 'EndTime', 'Version']
|
|
print tabulate.tabulate(rows, headers=headers)
|
|
|
|
def parse_job_detail(attrs):
|
|
job_details = {}
|
|
shard_ids_content = ''
|
|
for attr in attrs:
|
|
if attr[0].startswith('ShardIds_'):
|
|
shard_ids_content += attr[1]
|
|
else:
|
|
job_details[attr[0]] = attr[1]
|
|
|
|
shard_ids = json.loads(zlib.decompress(shard_ids_content))
|
|
|
|
if not job_details.has_key('Version'):
|
|
job_details['Version'] = ''
|
|
|
|
if not job_details.has_key('SkipCount'):
|
|
job_details['SkipCount'] = 0
|
|
job_details['ShardIds'] = shard_ids
|
|
|
|
return job_details
|
|
|
|
def parse_time(value):
|
|
try:
|
|
return int(value)
|
|
except Exception,e:
|
|
return int(time.mktime(time.strptime(value, '%Y-%m-%d %H:%M:%S')))
|
|
|
|
if __name__ == '__main__':
|
|
parser = OptionParser()
|
|
parser.add_option('-c', '--config', dest='config_file', help='path of config file', metavar='tablestore_streamreader_config.json')
|
|
parser.add_option('-a', '--action', dest='action', help='the action to do', choices = ['describe_job', 'list_job'], metavar='')
|
|
parser.add_option('-t', '--timestamp', dest='timestamp', help='the timestamp', metavar='')
|
|
parser.add_option('-s', '--streamid', dest='stream_id', help='the id of stream', metavar='')
|
|
parser.add_option('-d', '--shardid', dest='shard_id', help='the id of shard', metavar='')
|
|
|
|
options, args = parser.parse_args()
|
|
|
|
if not options.config_file:
|
|
print "Error: Should set the path of config file using '-c' or '--config'."
|
|
sys.exit(-1)
|
|
|
|
if not options.action:
|
|
print "Error: Should set the action using '-a' or '--action'."
|
|
sys.exit(-1)
|
|
|
|
console_config = ConsoleConfig(options.config_file)
|
|
if options.action == 'list_job':
|
|
list_job(console_config, options)
|
|
elif options.action == 'describe_job':
|
|
describe_job(console_config, options)
|
|
|