DataX/otsstreamreader/tools/tablestore_streamreader_console.py
2018-01-31 16:24:49 +08:00

180 lines
6.5 KiB
Python

#!/bin/usr/env python
#-*- coding: utf-8 -*-
from optparse import OptionParser
import sys
import json
import tabulate
import zlib
from ots2 import *
class ConsoleConfig:
def __init__(self, config_file):
f = open(config_file, 'r')
config = json.loads(f.read())
self.endpoint = str(config['endpoint'])
self.accessid = str(config['accessId'])
self.accesskey = str(config['accessKey'])
self.instance_name = str(config['instanceName'])
self.status_table = str(config['statusTable'])
self.ots = OTSClient(self.endpoint, self.accessid, self.accesskey, self.instance_name)
def describe_job(config, options):
'''
1. get job's description
2. get all job's checkpoints and check if it is done
'''
if not options.stream_id:
print "Error: Should set the stream id using '-s' or '--streamid'."
sys.exit(-1)
if not options.timestamp:
print "Error: Should set the timestamp using '-t' or '--timestamp'."
sys.exit(-1)
pk = [('StreamId', options.stream_id), ('StatusType', 'DataxJobDesc'), ('StatusValue', '%16d' % int(options.timestamp))]
consumed, pk, attrs, next_token = config.ots.get_row(config.status_table, pk, [], None, 1)
if not attrs:
print 'Stream job is not found.'
sys.exit(-1)
job_detail = parse_job_detail(attrs)
print '----------JobDescriptions----------'
print json.dumps(job_detail, indent=2)
print '-----------------------------------'
stream_checkpoints = _list_checkpoints(config, options.stream_id, int(options.timestamp))
cps_headers = ['ShardId', 'SendRecordCount', 'Checkpoint', 'SkipCount', 'Version']
table_content = []
for cp in stream_checkpoints:
table_content.append([cp['ShardId'], cp['SendRecordCount'], cp['Checkpoint'], cp['SkipCount'], cp['Version']])
print tabulate.tabulate(table_content, headers=cps_headers)
# check if stream job has finished
finished = True
if len(job_detail['ShardIds']) != len(stream_checkpoints):
finished = False
for cp in stream_checkpoints:
if cp['Version'] != job_detail['Version']:
finished = False
print '----------JobSummary----------'
print 'ShardsCount:', len(job_detail['ShardIds'])
print 'CheckPointsCount:', len(stream_checkpoints)
print 'JobStatus:', 'Finished' if finished else 'NotFinished'
print '------------------------------'
def _list_checkpoints(config, stream_id, timestamp):
start_pk = [('StreamId', stream_id), ('StatusType', 'CheckpointForDataxReader'), ('StatusValue', '%16d' % timestamp)]
end_pk = [('StreamId', stream_id), ('StatusType', 'CheckpointForDataxReader'), ('StatusValue', '%16d' % (timestamp + 1))]
consumed_counter = CapacityUnit(0, 0)
columns_to_get = []
checkpoints = []
range_iter = config.ots.xget_range(
config.status_table, Direction.FORWARD,
start_pk, end_pk,
consumed_counter, columns_to_get, 100,
column_filter=None, max_version=1
)
rows = []
for (primary_key, attrs) in range_iter:
checkpoint = {}
for attr in attrs:
checkpoint[attr[0]] = attr[1]
if not checkpoint.has_key('SendRecordCount'):
checkpoint['SendRecordCount'] = 0
checkpoint['ShardId'] = primary_key[2][1].split('\t')[1]
checkpoints.append(checkpoint)
return checkpoints
def list_job(config, options):
'''
Two options:
1. list all jobs of stream
2. list all jobs and all streams
'''
consumed_counter = CapacityUnit(0, 0)
if options.stream_id:
start_pk = [('StreamId', options.stream_id), ('StatusType', INF_MIN), ('StatusValue', INF_MIN)]
end_pk = [('StreamId', options.stream_id), ('StatusType', INF_MAX), ('StatusValue', INF_MAX)]
else:
start_pk = [('StreamId', INF_MIN), ('StatusType', INF_MIN), ('StatusValue', INF_MIN)]
end_pk = [('StreamId', INF_MAX), ('StatusType', INF_MAX), ('StatusValue', INF_MAX)]
columns_to_get = []
range_iter = config.ots.xget_range(
config.status_table, Direction.FORWARD,
start_pk, end_pk,
consumed_counter, columns_to_get, None,
column_filter=None, max_version=1
)
rows = []
for (primary_key, attrs) in range_iter:
if primary_key[1][1] == 'DataxJobDesc':
job_detail = parse_job_detail(attrs)
rows.append([job_detail['TableName'], job_detail['JobStreamId'], job_detail['EndTime'], job_detail['StartTime'], job_detail['EndTime'], job_detail['Version']])
headers = ['TableName', 'JobStreamId', 'Timestamp', 'StartTime', 'EndTime', 'Version']
print tabulate.tabulate(rows, headers=headers)
def parse_job_detail(attrs):
job_details = {}
shard_ids_content = ''
for attr in attrs:
if attr[0].startswith('ShardIds_'):
shard_ids_content += attr[1]
else:
job_details[attr[0]] = attr[1]
shard_ids = json.loads(zlib.decompress(shard_ids_content))
if not job_details.has_key('Version'):
job_details['Version'] = ''
if not job_details.has_key('SkipCount'):
job_details['SkipCount'] = 0
job_details['ShardIds'] = shard_ids
return job_details
def parse_time(value):
try:
return int(value)
except Exception,e:
return int(time.mktime(time.strptime(value, '%Y-%m-%d %H:%M:%S')))
if __name__ == '__main__':
parser = OptionParser()
parser.add_option('-c', '--config', dest='config_file', help='path of config file', metavar='tablestore_streamreader_config.json')
parser.add_option('-a', '--action', dest='action', help='the action to do', choices = ['describe_job', 'list_job'], metavar='')
parser.add_option('-t', '--timestamp', dest='timestamp', help='the timestamp', metavar='')
parser.add_option('-s', '--streamid', dest='stream_id', help='the id of stream', metavar='')
parser.add_option('-d', '--shardid', dest='shard_id', help='the id of shard', metavar='')
options, args = parser.parse_args()
if not options.config_file:
print "Error: Should set the path of config file using '-c' or '--config'."
sys.exit(-1)
if not options.action:
print "Error: Should set the action using '-a' or '--action'."
sys.exit(-1)
console_config = ConsoleConfig(options.config_file)
if options.action == 'list_job':
list_job(console_config, options)
elif options.action == 'describe_job':
describe_job(console_config, options)