mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 11:11:08 +08:00
Merge pull request #23 from taosdata/test/TS-1218
[TS-1218]test for migrate t2dm
This commit is contained in:
commit
bcbe242a29
@ -16,8 +16,6 @@ import java.text.ParseException;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
public class TDengineReader extends Reader {
|
||||
|
||||
@ -129,6 +127,15 @@ public class TDengineReader extends Reader {
|
||||
private String startTime;
|
||||
private String endTime;
|
||||
|
||||
static {
|
||||
try {
|
||||
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
|
||||
} catch (ClassNotFoundException e) {
|
||||
e.printStackTrace();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
this.readerSliceConfig = super.getPluginJobConf();
|
||||
@ -239,32 +246,6 @@ public class TDengineReader extends Reader {
|
||||
}
|
||||
}
|
||||
|
||||
private static Long parseSplitInterval(String splitInterval) throws Exception {
|
||||
final long second = 1000;
|
||||
final long minute = 60 * second;
|
||||
final long hour = 60 * minute;
|
||||
final long day = 24 * hour;
|
||||
|
||||
Pattern compile = Pattern.compile("^(\\d+)([dhms])$");
|
||||
Matcher matcher = compile.matcher(splitInterval);
|
||||
while (matcher.find()) {
|
||||
long value = Long.parseLong(matcher.group(1));
|
||||
if (value == 0)
|
||||
throw new Exception("invalid splitInterval: 0");
|
||||
char unit = matcher.group(2).charAt(0);
|
||||
switch (unit) {
|
||||
case 'd':
|
||||
return value * day;
|
||||
default:
|
||||
case 'h':
|
||||
return value * hour;
|
||||
case 'm':
|
||||
return value * minute;
|
||||
case 's':
|
||||
return value * second;
|
||||
}
|
||||
}
|
||||
throw new Exception("invalid splitInterval: " + splitInterval);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -1,9 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
datax_home_dir=$(dirname $(readlink -f "$0"))
|
||||
|
||||
curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'drop table if exists db2.stb2;' 192.168.1.93:6041/rest/sql
|
||||
curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create table if not exists db2.stb2 (`ts` TIMESTAMP,`f2` SMALLINT,`f4` BIGINT,`f5` FLOAT,`f6` DOUBLE,`f7` DOUBLE,`f8` BOOL,`f9` NCHAR(100),`f10` NCHAR(200)) TAGS (`f1` TINYINT,`f3` INT);' 192.168.1.93:6041/rest/sql
|
||||
|
||||
rm -f ${datax_home_dir}/log/*
|
||||
rm -f ${datax_home_dir}/job/*.csv
|
@ -1,63 +0,0 @@
|
||||
{
|
||||
"job": {
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "rdbmsreader",
|
||||
"parameter": {
|
||||
"username": "TESTUSER",
|
||||
"password": "test123456",
|
||||
"connection": [
|
||||
{
|
||||
"querySql": [
|
||||
"select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1"
|
||||
],
|
||||
"jdbcUrl": [
|
||||
"jdbc:dm://192.168.0.72:5236"
|
||||
]
|
||||
}
|
||||
],
|
||||
"where": "1=1",
|
||||
"fetchSize": 1024
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "tdenginewriter",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "taosdata",
|
||||
"column": [
|
||||
"tbname",
|
||||
"ts",
|
||||
"f1",
|
||||
"f2",
|
||||
"f3",
|
||||
"f4",
|
||||
"f5",
|
||||
"f6",
|
||||
"f7",
|
||||
"f8",
|
||||
"f9",
|
||||
"f10"
|
||||
],
|
||||
"connection": [
|
||||
{
|
||||
"table": [
|
||||
"stb2"
|
||||
],
|
||||
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
|
||||
}
|
||||
],
|
||||
"batchSize": 1000,
|
||||
"ignoreTagsUnmatched": true
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -1,57 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
set -e
|
||||
#set -x
|
||||
|
||||
datax_home_dir=$(dirname $(readlink -f "$0"))
|
||||
table_name="stb1"
|
||||
update_key="ts"
|
||||
|
||||
while getopts "hd:t:" arg; do
|
||||
case $arg in
|
||||
d)
|
||||
datax_home_dir=$(echo $OPTARG)
|
||||
;;
|
||||
v)
|
||||
table_name=$(echo $OPTARG)
|
||||
;;
|
||||
h)
|
||||
echo "Usage: $(basename $0) -d [datax_home_dir] -t [table_name] -k [update_key]"
|
||||
echo " -h help"
|
||||
exit 0
|
||||
;;
|
||||
?) #unknow option
|
||||
echo "unkonw argument"
|
||||
exit 1
|
||||
;;
|
||||
esac
|
||||
done
|
||||
|
||||
if [[ -e ${datax_home_dir}/job/${table_name}.csv ]]; then
|
||||
MAX_TIME=$(cat ${datax_home_dir}/job/${table_name}.csv)
|
||||
else
|
||||
MAX_TIME="null"
|
||||
fi
|
||||
current_datetime=$(date +"%Y-%m-%d %H:%M:%S")
|
||||
current_timestamp=$(date +%s)
|
||||
|
||||
if [ "$MAX_TIME" != "null" ]; then
|
||||
WHERE="${update_key} >= '$MAX_TIME' and ${update_key} < '$current_datetime'"
|
||||
sed "s/1=1/$WHERE/g" ${datax_home_dir}/job/dm2t-update.json >${datax_home_dir}/job/dm2t_${current_timestamp}.json
|
||||
echo "incremental data synchronization, from '${MAX_TIME}' to '${current_datetime}'"
|
||||
python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t_${current_timestamp}.json 1> /dev/null 2>&1
|
||||
else
|
||||
echo "full data synchronization, to '${current_datetime}'"
|
||||
python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t-update.json 1> /dev/null 2>&1
|
||||
fi
|
||||
|
||||
if [[ $? -ne 0 ]]; then
|
||||
echo "datax migration job falied"
|
||||
else
|
||||
echo ${current_datetime} >$datax_home_dir/job/${table_name}.csv
|
||||
echo "datax migration job success"
|
||||
fi
|
||||
|
||||
rm -rf ${datax_home_dir}/job/dm2t_${current_timestamp}.json
|
||||
|
||||
#while true; do ./dm2t_sync.sh; sleep 5s; done
|
@ -1,5 +0,0 @@
|
||||
#!/bin/bash
|
||||
|
||||
scp dm2t-update.json root@192.168.56.105:/root/workspace/tmp/datax/job
|
||||
scp dm2t_sync.sh root@192.168.56.105:/root/workspace/tmp/datax
|
||||
scp clean_env.sh root@192.168.56.105:/root/workspace/tmp/datax
|
Loading…
Reference in New Issue
Block a user