mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 17:40:28 +08:00
[TS-1219]<feature>: incremental synchronization scripts of datax
This commit is contained in:
parent
9a51e665e5
commit
740bf010e8
@ -37,8 +37,7 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。
|
|||||||
"phase"
|
"phase"
|
||||||
],
|
],
|
||||||
"beginDateTime": "2017-07-14 10:40:00",
|
"beginDateTime": "2017-07-14 10:40:00",
|
||||||
"endDateTime": "2017-08-14 10:40:00",
|
"endDateTime": "2017-08-14 10:40:00"
|
||||||
"splitInterval": "1d"
|
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
"writer": {
|
"writer": {
|
||||||
@ -79,17 +78,12 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。
|
|||||||
* 必选:是 <br />
|
* 必选:是 <br />
|
||||||
* 默认值:无<br />
|
* 默认值:无<br />
|
||||||
* **beginDateTime**
|
* **beginDateTime**
|
||||||
* 描述:数据的开始时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss <br />
|
* 描述:数据的开始时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss,如果不填为全量同步 <br />
|
||||||
* 必选:是 <br />
|
* 必选:否 <br />
|
||||||
* 默认值:无 <br />
|
* 默认值:无 <br />
|
||||||
* **endDateTime**
|
* **endDateTime**
|
||||||
* 描述:数据的结束时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss <br />
|
* 描述:数据的结束时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss,如果不填为全量同步 <br />
|
||||||
* 必选:是 <br />
|
* 必选:否 <br />
|
||||||
* 默认值:无 <br />
|
|
||||||
* **splitInterval**
|
|
||||||
* 描述:按照splitInterval来划分task, 每splitInterval创建一个task。例如,20d代表按照每20天的数据划分为1个task。
|
|
||||||
可以配置的时间单位为:d(天), h(小时), m(分钟), s(秒) <br />
|
|
||||||
* 必选:是 <br />
|
|
||||||
* 默认值:无 <br />
|
* 默认值:无 <br />
|
||||||
|
|
||||||
### 3.3 类型转换
|
### 3.3 类型转换
|
||||||
@ -106,7 +100,6 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。
|
|||||||
| BINARY | Bytes |
|
| BINARY | Bytes |
|
||||||
| NCHAR | String |
|
| NCHAR | String |
|
||||||
|
|
||||||
|
|
||||||
## 4 性能报告
|
## 4 性能报告
|
||||||
|
|
||||||
### 4.1 环境准备
|
### 4.1 环境准备
|
@ -29,10 +29,29 @@
|
|||||||
</exclusions>
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba</groupId>
|
||||||
|
<artifactId>fastjson</artifactId>
|
||||||
|
<version>1.2.78</version>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax.tdenginewriter</groupId>
|
||||||
|
<artifactId>tdenginewriter</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<scope>compile</scope>
|
||||||
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.taosdata.jdbc</groupId>
|
<groupId>com.taosdata.jdbc</groupId>
|
||||||
<artifactId>taos-jdbcdriver</artifactId>
|
<artifactId>taos-jdbcdriver</artifactId>
|
||||||
<version>2.0.37</version>
|
<version>2.0.37</version>
|
||||||
|
<exclusions>
|
||||||
|
<exclusion>
|
||||||
|
<groupId>com.alibaba</groupId>
|
||||||
|
<artifactId>fastjson</artifactId>
|
||||||
|
</exclusion>
|
||||||
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
@ -47,6 +66,21 @@
|
|||||||
<version>0.0.1-SNAPSHOT</version>
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
<scope>compile</scope>
|
<scope>compile</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.alibaba.datax</groupId>
|
||||||
|
<artifactId>datax-core</artifactId>
|
||||||
|
<version>0.0.1-SNAPSHOT</version>
|
||||||
|
<scope>test</scope>
|
||||||
|
</dependency>
|
||||||
|
<!-- 添加 dm8 jdbc jar 包依赖-->
|
||||||
|
<dependency>
|
||||||
|
<groupId>com.dameng</groupId>
|
||||||
|
<artifactId>dm-jdbc</artifactId>
|
||||||
|
<version>1.8</version>
|
||||||
|
<scope>system</scope>
|
||||||
|
<systemPath>${project.basedir}/src/test/resources/DmJdbcDriver18.jar</systemPath>
|
||||||
|
</dependency>
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
<build>
|
<build>
|
||||||
|
@ -1,19 +0,0 @@
|
|||||||
package com.alibaba.datax.plugin.reader;
|
|
||||||
|
|
||||||
public class Key {
|
|
||||||
|
|
||||||
public static final String JDBC_URL = "jdbcUrl";
|
|
||||||
// public static final String HOST = "host";
|
|
||||||
// public static final String PORT = "port";
|
|
||||||
// public static final String DB = "db";
|
|
||||||
public static final String TABLE = "table";
|
|
||||||
public static final String USER = "username";
|
|
||||||
public static final String PASSWORD = "password";
|
|
||||||
public static final String CONNECTION = "connection";
|
|
||||||
// public static final String SQL = "sql";
|
|
||||||
public static final String BEGIN_DATETIME = "beginDateTime";
|
|
||||||
public static final String END_DATETIME = "endDateTime";
|
|
||||||
public static final String SPLIT_INTERVAL = "splitInterval";
|
|
||||||
public static final String COLUMN = "column";
|
|
||||||
public static final String MANDATORY_ENCODING = "mandatoryEncoding";
|
|
||||||
}
|
|
@ -1,13 +1,11 @@
|
|||||||
package com.alibaba.datax.plugin.reader;
|
package com.alibaba.datax.plugin.reader;
|
||||||
|
|
||||||
import com.alibaba.datax.common.constant.CommonConstant;
|
|
||||||
import com.alibaba.datax.common.element.*;
|
import com.alibaba.datax.common.element.*;
|
||||||
import com.alibaba.datax.common.exception.DataXException;
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
import com.alibaba.datax.common.plugin.RecordSender;
|
import com.alibaba.datax.common.plugin.RecordSender;
|
||||||
import com.alibaba.datax.common.spi.Reader;
|
import com.alibaba.datax.common.spi.Reader;
|
||||||
import com.alibaba.datax.common.util.Configuration;
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
import com.alibaba.datax.plugin.writer.tdenginewriter.Key;
|
||||||
import com.alibaba.fastjson.JSON;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
import org.apache.commons.lang3.StringUtils;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -22,84 +20,77 @@ import java.util.regex.Matcher;
|
|||||||
import java.util.regex.Pattern;
|
import java.util.regex.Pattern;
|
||||||
|
|
||||||
public class TDengineReader extends Reader {
|
public class TDengineReader extends Reader {
|
||||||
|
|
||||||
private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss";
|
||||||
|
|
||||||
public static class Job extends Reader.Job {
|
public static class Job extends Reader.Job {
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
||||||
private Configuration originalConfig;
|
private Configuration originalConfig;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init() {
|
public void init() {
|
||||||
this.originalConfig = super.getPluginJobConf();
|
this.originalConfig = super.getPluginJobConf();
|
||||||
// check user
|
// check username
|
||||||
String user = this.originalConfig.getString(Key.USER);
|
String username = this.originalConfig.getString(Key.USERNAME);
|
||||||
if (StringUtils.isBlank(user))
|
if (StringUtils.isBlank(username))
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.USER + "] is not set.");
|
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
||||||
|
"The parameter [" + Key.USERNAME + "] is not set.");
|
||||||
|
|
||||||
// check password
|
// check password
|
||||||
String password = this.originalConfig.getString(Key.PASSWORD);
|
String password = this.originalConfig.getString(Key.PASSWORD);
|
||||||
if (StringUtils.isBlank(password))
|
if (StringUtils.isBlank(password))
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PASSWORD + "] is not set.");
|
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
||||||
|
"The parameter [" + Key.PASSWORD + "] is not set.");
|
||||||
SimpleDateFormat format = new SimpleDateFormat(DATETIME_FORMAT);
|
|
||||||
// check beginDateTime
|
|
||||||
String beginDatetime = this.originalConfig.getString(Key.BEGIN_DATETIME);
|
|
||||||
if (StringUtils.isBlank(beginDatetime))
|
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] is not set.");
|
|
||||||
Long start;
|
|
||||||
try {
|
|
||||||
start = format.parse(beginDatetime).getTime();
|
|
||||||
} catch (ParseException e) {
|
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format.");
|
|
||||||
}
|
|
||||||
|
|
||||||
// check endDateTime
|
|
||||||
String endDatetime = this.originalConfig.getString(Key.END_DATETIME);
|
|
||||||
if (StringUtils.isBlank(endDatetime))
|
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.END_DATETIME + "] is not set.");
|
|
||||||
Long end;
|
|
||||||
try {
|
|
||||||
end = format.parse(endDatetime).getTime();
|
|
||||||
} catch (ParseException e) {
|
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.END_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format.");
|
|
||||||
}
|
|
||||||
if (start >= end)
|
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "].");
|
|
||||||
|
|
||||||
// check splitInterval
|
|
||||||
String splitInterval = this.originalConfig.getString(Key.SPLIT_INTERVAL);
|
|
||||||
Long split;
|
|
||||||
if (StringUtils.isBlank(splitInterval))
|
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.SPLIT_INTERVAL + "] is not set.");
|
|
||||||
try {
|
|
||||||
split = parseSplitInterval(splitInterval);
|
|
||||||
} catch (Exception e) {
|
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.SPLIT_INTERVAL + "] should be like: \"123d|h|m|s\", error: " + e.getMessage());
|
|
||||||
}
|
|
||||||
|
|
||||||
this.originalConfig.set(Key.BEGIN_DATETIME, start);
|
|
||||||
this.originalConfig.set(Key.END_DATETIME, end);
|
|
||||||
this.originalConfig.set(Key.SPLIT_INTERVAL, split);
|
|
||||||
|
|
||||||
// check connection
|
// check connection
|
||||||
List<Object> connection = this.originalConfig.getList(Key.CONNECTION);
|
List<Object> connection = this.originalConfig.getList(Key.CONNECTION);
|
||||||
if (connection == null || connection.isEmpty())
|
if (connection == null || connection.isEmpty())
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.CONNECTION + "] is not set.");
|
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
||||||
|
"The parameter [" + Key.CONNECTION + "] is not set.");
|
||||||
for (int i = 0; i < connection.size(); i++) {
|
for (int i = 0; i < connection.size(); i++) {
|
||||||
Configuration conn = Configuration.from(connection.get(i).toString());
|
Configuration conn = Configuration.from(connection.get(i).toString());
|
||||||
List<Object> table = conn.getList(Key.TABLE);
|
List<Object> table = conn.getList(Key.TABLE);
|
||||||
if (table == null || table.isEmpty())
|
if (table == null || table.isEmpty())
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set.");
|
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
||||||
|
"The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set.");
|
||||||
String jdbcUrl = conn.getString(Key.JDBC_URL);
|
String jdbcUrl = conn.getString(Key.JDBC_URL);
|
||||||
if (StringUtils.isBlank(jdbcUrl))
|
if (StringUtils.isBlank(jdbcUrl))
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.JDBC_URL + "] of connection[" + (i + 1) + "] is not set.");
|
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
||||||
|
"The parameter [" + Key.JDBC_URL + "] of connection[" + (i + 1) + "] is not set.");
|
||||||
}
|
}
|
||||||
|
|
||||||
// check column
|
// check column
|
||||||
List<Object> column = this.originalConfig.getList(Key.COLUMN);
|
List<Object> column = this.originalConfig.getList(Key.COLUMN);
|
||||||
if (column == null || column.isEmpty())
|
if (column == null || column.isEmpty())
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.CONNECTION + "] is not set or is empty.");
|
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
||||||
|
"The parameter [" + Key.CONNECTION + "] is not set or is empty.");
|
||||||
|
|
||||||
|
SimpleDateFormat format = new SimpleDateFormat(DATETIME_FORMAT);
|
||||||
|
// check beginDateTime
|
||||||
|
String beginDatetime = this.originalConfig.getString(Key.BEGIN_DATETIME);
|
||||||
|
long start = Long.MIN_VALUE;
|
||||||
|
if (!StringUtils.isBlank(beginDatetime)) {
|
||||||
|
try {
|
||||||
|
start = format.parse(beginDatetime).getTime();
|
||||||
|
} catch (ParseException e) {
|
||||||
|
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE,
|
||||||
|
"The parameter [" + Key.BEGIN_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// check endDateTime
|
||||||
|
String endDatetime = this.originalConfig.getString(Key.END_DATETIME);
|
||||||
|
long end = Long.MAX_VALUE;
|
||||||
|
if (!StringUtils.isBlank(endDatetime)) {
|
||||||
|
try {
|
||||||
|
end = format.parse(endDatetime).getTime();
|
||||||
|
} catch (ParseException e) {
|
||||||
|
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE,
|
||||||
|
"The parameter [" + Key.END_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if (start >= end)
|
||||||
|
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE,
|
||||||
|
"The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "].");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -110,78 +101,58 @@ public class TDengineReader extends Reader {
|
|||||||
@Override
|
@Override
|
||||||
public List<Configuration> split(int adviceNumber) {
|
public List<Configuration> split(int adviceNumber) {
|
||||||
List<Configuration> configurations = new ArrayList<>();
|
List<Configuration> configurations = new ArrayList<>();
|
||||||
// do split
|
|
||||||
Long start = this.originalConfig.getLong(Key.BEGIN_DATETIME);
|
|
||||||
Long end = this.originalConfig.getLong(Key.END_DATETIME);
|
|
||||||
Long split = this.originalConfig.getLong(Key.SPLIT_INTERVAL);
|
|
||||||
|
|
||||||
List<Object> conns = this.originalConfig.getList(Key.CONNECTION);
|
List<Object> connectionList = this.originalConfig.getList(Key.CONNECTION);
|
||||||
|
for (Object conn : connectionList) {
|
||||||
for (Long ts = start; ts < end; ts += split) {
|
Configuration clone = this.originalConfig.clone();
|
||||||
for (int i = 0; i < conns.size(); i++) {
|
Configuration conf = Configuration.from(conn.toString());
|
||||||
Configuration clone = this.originalConfig.clone();
|
String jdbcUrl = conf.getString(Key.JDBC_URL);
|
||||||
clone.remove(Key.SPLIT_INTERVAL);
|
clone.set(Key.JDBC_URL, jdbcUrl);
|
||||||
|
clone.set(Key.TABLE, conf.getList(Key.TABLE));
|
||||||
clone.set(Key.BEGIN_DATETIME, ts);
|
clone.remove(Key.CONNECTION);
|
||||||
clone.set(Key.END_DATETIME, Math.min(ts + split, end));
|
configurations.add(clone);
|
||||||
|
|
||||||
Configuration conf = Configuration.from(conns.get(i).toString());
|
|
||||||
String jdbcUrl = conf.getString(Key.JDBC_URL);
|
|
||||||
clone.set(Key.JDBC_URL, jdbcUrl);
|
|
||||||
clone.set(Key.TABLE, conf.getList(Key.TABLE));
|
|
||||||
|
|
||||||
// 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标,以提供给 core 做有意义的 shuffle 操作
|
|
||||||
clone.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl));
|
|
||||||
clone.remove(Key.CONNECTION);
|
|
||||||
|
|
||||||
configurations.add(clone);
|
|
||||||
LOG.info("Configuration: {}", JSON.toJSONString(clone));
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG.info("Configuration: {}", configurations);
|
||||||
return configurations;
|
return configurations;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static class Task extends Reader.Task {
|
public static class Task extends Reader.Task {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
|
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
|
||||||
private Connection conn;
|
|
||||||
|
|
||||||
private Long startTime;
|
private Configuration readerSliceConfig;
|
||||||
private Long endTime;
|
private Connection conn;
|
||||||
private List<String> tables;
|
private List<String> tables;
|
||||||
private List<String> columns;
|
private List<String> columns;
|
||||||
private String mandatoryEncoding;
|
|
||||||
|
private String startTime;
|
||||||
|
private String endTime;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init() {
|
public void init() {
|
||||||
Configuration readerSliceConfig = super.getPluginJobConf();
|
this.readerSliceConfig = super.getPluginJobConf();
|
||||||
LOG.info("getPluginJobConf: {}", JSON.toJSONString(readerSliceConfig));
|
LOG.info("getPluginJobConf: {}", readerSliceConfig);
|
||||||
|
|
||||||
String url = readerSliceConfig.getString(Key.JDBC_URL);
|
String url = readerSliceConfig.getString(Key.JDBC_URL);
|
||||||
if (StringUtils.isBlank(url))
|
if (StringUtils.isBlank(url))
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE,
|
||||||
"The parameter [" + Key.JDBC_URL + "] is not set.");
|
"The parameter [" + Key.JDBC_URL + "] is not set.");
|
||||||
|
String user = readerSliceConfig.getString(Key.USERNAME);
|
||||||
tables = readerSliceConfig.getList(Key.TABLE, String.class);
|
|
||||||
columns = readerSliceConfig.getList(Key.COLUMN, String.class);
|
|
||||||
|
|
||||||
String user = readerSliceConfig.getString(Key.USER);
|
|
||||||
String password = readerSliceConfig.getString(Key.PASSWORD);
|
String password = readerSliceConfig.getString(Key.PASSWORD);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
conn = DriverManager.getConnection(url, user, password);
|
this.conn = DriverManager.getConnection(url, user, password);
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.CONNECTION_FAILED,
|
throw DataXException.asDataXException(TDengineReaderErrorCode.CONNECTION_FAILED,
|
||||||
"The parameter [" + Key.JDBC_URL + "] : " + url + " failed to connect since: " + e.getMessage());
|
"The parameter [" + Key.JDBC_URL + "] : " + url + " failed to connect since: " + e.getMessage(), e);
|
||||||
}
|
}
|
||||||
|
|
||||||
this.mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "");
|
this.tables = readerSliceConfig.getList(Key.TABLE, String.class);
|
||||||
|
this.columns = readerSliceConfig.getList(Key.COLUMN, String.class);
|
||||||
startTime = readerSliceConfig.getLong(Key.BEGIN_DATETIME);
|
this.startTime = readerSliceConfig.getString(Key.BEGIN_DATETIME);
|
||||||
endTime = readerSliceConfig.getLong(Key.END_DATETIME);
|
this.endTime = readerSliceConfig.getString(Key.END_DATETIME);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void destroy() {
|
public void destroy() {
|
||||||
|
|
||||||
@ -189,18 +160,31 @@ public class TDengineReader extends Reader {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void startRead(RecordSender recordSender) {
|
public void startRead(RecordSender recordSender) {
|
||||||
|
|
||||||
try (Statement stmt = conn.createStatement()) {
|
try (Statement stmt = conn.createStatement()) {
|
||||||
for (String table : tables) {
|
for (String table : tables) {
|
||||||
String sql = "select " + StringUtils.join(columns, ",") + " from " + table
|
StringBuilder sb = new StringBuilder();
|
||||||
+ " where _c0 >= " + startTime + " and _c0 < " + endTime;
|
sb.append("select ").append(StringUtils.join(columns, ","))
|
||||||
|
.append(" from ").append(table).append(" ");
|
||||||
|
|
||||||
|
if (StringUtils.isBlank(startTime)) {
|
||||||
|
sb.append("where _c0 >= ").append(Long.MIN_VALUE);
|
||||||
|
} else {
|
||||||
|
sb.append("where _c0 >= '").append(startTime).append("'");
|
||||||
|
}
|
||||||
|
if (!StringUtils.isBlank(endTime)) {
|
||||||
|
sb.append(" and _c0 < '").append(endTime).append("'");
|
||||||
|
}
|
||||||
|
|
||||||
|
String sql = sb.toString();
|
||||||
ResultSet rs = stmt.executeQuery(sql);
|
ResultSet rs = stmt.executeQuery(sql);
|
||||||
ResultSetMetaData metaData = rs.getMetaData();
|
|
||||||
while (rs.next()) {
|
while (rs.next()) {
|
||||||
transportOneRecord(recordSender, rs, metaData, metaData.getColumnCount(), this.mandatoryEncoding);
|
Record record = buildRecord(recordSender, rs, "UTF-8");
|
||||||
|
recordSender.sendToWriter(record);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e);
|
throw DataXException.asDataXException(TDengineReaderErrorCode.RUNTIME_EXCEPTION, e.getMessage(), e);
|
||||||
} finally {
|
} finally {
|
||||||
try {
|
try {
|
||||||
if (conn != null)
|
if (conn != null)
|
||||||
@ -211,17 +195,11 @@ public class TDengineReader extends Reader {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private Record transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnCount, String mandatoryEncoding) {
|
private Record buildRecord(RecordSender recordSender, ResultSet rs, String mandatoryEncoding) {
|
||||||
Record record = buildRecord(recordSender, rs, metaData, columnCount, mandatoryEncoding);
|
|
||||||
recordSender.sendToWriter(record);
|
|
||||||
return record;
|
|
||||||
}
|
|
||||||
|
|
||||||
private Record buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnCount, String mandatoryEncoding) {
|
|
||||||
Record record = recordSender.createRecord();
|
Record record = recordSender.createRecord();
|
||||||
|
|
||||||
try {
|
try {
|
||||||
for (int i = 1; i <= columnCount; i++) {
|
ResultSetMetaData metaData = rs.getMetaData();
|
||||||
|
for (int i = 1; i <= metaData.getColumnCount(); i++) {
|
||||||
int columnType = metaData.getColumnType(i);
|
int columnType = metaData.getColumnType(i);
|
||||||
switch (columnType) {
|
switch (columnType) {
|
||||||
case Types.SMALLINT:
|
case Types.SMALLINT:
|
||||||
@ -261,16 +239,16 @@ public class TDengineReader extends Reader {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final long second = 1000;
|
|
||||||
private static final long minute = 60 * second;
|
|
||||||
private static final long hour = 60 * minute;
|
|
||||||
private static final long day = 24 * hour;
|
|
||||||
|
|
||||||
private static Long parseSplitInterval(String splitInterval) throws Exception {
|
private static Long parseSplitInterval(String splitInterval) throws Exception {
|
||||||
|
final long second = 1000;
|
||||||
|
final long minute = 60 * second;
|
||||||
|
final long hour = 60 * minute;
|
||||||
|
final long day = 24 * hour;
|
||||||
|
|
||||||
Pattern compile = Pattern.compile("^(\\d+)([dhms])$");
|
Pattern compile = Pattern.compile("^(\\d+)([dhms])$");
|
||||||
Matcher matcher = compile.matcher(splitInterval);
|
Matcher matcher = compile.matcher(splitInterval);
|
||||||
while (matcher.find()) {
|
while (matcher.find()) {
|
||||||
Long value = Long.valueOf(matcher.group(1));
|
long value = Long.parseLong(matcher.group(1));
|
||||||
if (value == 0)
|
if (value == 0)
|
||||||
throw new Exception("invalid splitInterval: 0");
|
throw new Exception("invalid splitInterval: 0");
|
||||||
char unit = matcher.group(2).charAt(0);
|
char unit = matcher.group(2).charAt(0);
|
||||||
|
@ -4,9 +4,10 @@ import com.alibaba.datax.common.spi.ErrorCode;
|
|||||||
|
|
||||||
public enum TDengineReaderErrorCode implements ErrorCode {
|
public enum TDengineReaderErrorCode implements ErrorCode {
|
||||||
|
|
||||||
REQUIRED_VALUE("TDengineReader-00", "缺失必要的值"),
|
REQUIRED_VALUE("TDengineReader-00", "parameter value is missing"),
|
||||||
ILLEGAL_VALUE("TDengineReader-01", "值非法"),
|
ILLEGAL_VALUE("TDengineReader-01", "invalid parameter value"),
|
||||||
CONNECTION_FAILED("TDengineReader-02", "连接错误");
|
CONNECTION_FAILED("TDengineReader-02", "connection error"),
|
||||||
|
RUNTIME_EXCEPTION("TDengineWriter-03", "runtime exception");
|
||||||
|
|
||||||
private final String code;
|
private final String code;
|
||||||
private final String description;
|
private final String description;
|
||||||
|
@ -0,0 +1,87 @@
|
|||||||
|
package com.alibaba.datax.plugin.reader;
|
||||||
|
|
||||||
|
import com.alibaba.datax.core.Engine;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Ignore;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.sql.DriverManager;
|
||||||
|
import java.sql.SQLException;
|
||||||
|
import java.sql.Statement;
|
||||||
|
import java.util.Random;
|
||||||
|
|
||||||
|
@Ignore
|
||||||
|
public class TDengine2DMTest {
|
||||||
|
private static final String host1 = "192.168.56.105";
|
||||||
|
private static final String host2 = "192.168.0.72";
|
||||||
|
|
||||||
|
private final Random random = new Random(System.currentTimeMillis());
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void t2dm_case01() throws Throwable {
|
||||||
|
// given
|
||||||
|
createSupTable("ms");
|
||||||
|
|
||||||
|
// when
|
||||||
|
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"};
|
||||||
|
System.setProperty("datax.home", "../target/datax/datax");
|
||||||
|
Engine.entry(params);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void t2dm_case02() throws Throwable {
|
||||||
|
// given
|
||||||
|
createSupTable("us");
|
||||||
|
|
||||||
|
// when
|
||||||
|
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"};
|
||||||
|
System.setProperty("datax.home", "../target/datax/datax");
|
||||||
|
Engine.entry(params);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void t2dm_case03() throws Throwable {
|
||||||
|
// given
|
||||||
|
createSupTable("ns");
|
||||||
|
|
||||||
|
// when
|
||||||
|
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"};
|
||||||
|
System.setProperty("datax.home", "../target/datax/datax");
|
||||||
|
Engine.entry(params);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void createSupTable(String precision) throws SQLException {
|
||||||
|
final String url = "jdbc:TAOS-RS://" + host1 + ":6041/";
|
||||||
|
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
|
||||||
|
Statement stmt = conn.createStatement();
|
||||||
|
|
||||||
|
stmt.execute("drop database if exists db1");
|
||||||
|
stmt.execute("create database if not exists db1 precision '" + precision + "'");
|
||||||
|
stmt.execute("create table db1.stb1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " +
|
||||||
|
"f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, t3 smallint, " +
|
||||||
|
"t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(100))");
|
||||||
|
|
||||||
|
for (int i = 1; i <= 10; i++) {
|
||||||
|
stmt.execute("insert into db1.tb" + i + " using db1.stb1 tags(now, " + random.nextInt(10) + "," +
|
||||||
|
random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextInt(10) + "," +
|
||||||
|
random.nextFloat() + "," + random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123'," +
|
||||||
|
"'北京朝阳望京') values(now+" + i + "s, " + random.nextInt(10) + "," + random.nextInt(10) + "," +
|
||||||
|
+random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextFloat() + "," +
|
||||||
|
random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123','北京朝阳望京')");
|
||||||
|
}
|
||||||
|
stmt.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
final String url2 = "jdbc:dm://" + host2 + ":5236";
|
||||||
|
try (Connection conn = DriverManager.getConnection(url2, "TESTUSER", "test123456")) {
|
||||||
|
conn.setAutoCommit(true);
|
||||||
|
Statement stmt = conn.createStatement();
|
||||||
|
stmt.execute("drop table if exists stb2");
|
||||||
|
stmt.execute("create table stb2(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " +
|
||||||
|
"f6 double, f7 BIT, f8 VARCHAR(100), f9 VARCHAR2(200), t1 timestamp, t2 tinyint, t3 smallint, " +
|
||||||
|
"t4 int, t5 bigint, t6 float, t7 double, t8 BIT, t9 VARCHAR(100), t10 VARCHAR2(200))");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
@ -1,72 +1,118 @@
|
|||||||
package com.alibaba.datax.plugin.reader;
|
package com.alibaba.datax.plugin.reader;
|
||||||
|
|
||||||
import com.alibaba.datax.common.util.Configuration;
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import com.alibaba.datax.plugin.writer.tdenginewriter.Key;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.text.ParseException;
|
|
||||||
import java.text.SimpleDateFormat;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class TDengineReaderTest {
|
public class TDengineReaderTest {
|
||||||
|
|
||||||
TDengineReader.Job job;
|
@Test
|
||||||
|
public void jobInit_case01() {
|
||||||
@Before
|
// given
|
||||||
public void before() {
|
TDengineReader.Job job = new TDengineReader.Job();
|
||||||
job = new TDengineReader.Job();
|
|
||||||
Configuration configuration = Configuration.from("{" +
|
Configuration configuration = Configuration.from("{" +
|
||||||
"\"user\": \"root\"," +
|
"\"username\": \"root\"," +
|
||||||
"\"password\": \"taosdata\"," +
|
"\"password\": \"taosdata\"," +
|
||||||
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," +
|
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," +
|
||||||
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
|
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
|
||||||
"\"beginDateTime\": \"2021-01-01 00:00:00\"," +
|
"\"beginDateTime\": \"2021-01-01 00:00:00\"," +
|
||||||
"\"endDateTime\": \"2021-01-01 12:00:00\"," +
|
"\"endDateTime\": \"2021-01-01 12:00:00\"" +
|
||||||
"\"splitInterval\": \"1h\"" +
|
|
||||||
"}");
|
"}");
|
||||||
job.setPluginJobConf(configuration);
|
job.setPluginJobConf(configuration);
|
||||||
}
|
|
||||||
|
|
||||||
@Test
|
|
||||||
public void jobInit() throws ParseException {
|
|
||||||
// when
|
// when
|
||||||
job.init();
|
job.init();
|
||||||
|
|
||||||
// assert
|
// assert
|
||||||
Configuration conf = job.getPluginJobConf();
|
Configuration conf = job.getPluginJobConf();
|
||||||
|
|
||||||
Assert.assertEquals("root", conf.getString("user"));
|
Assert.assertEquals("root", conf.getString(Key.USERNAME));
|
||||||
Assert.assertEquals("taosdata", conf.getString("password"));
|
Assert.assertEquals("taosdata", conf.getString("password"));
|
||||||
Assert.assertEquals("weather", conf.getString("connection[0].table[0]"));
|
Assert.assertEquals("weather", conf.getString("connection[0].table[0]"));
|
||||||
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl"));
|
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl"));
|
||||||
|
Assert.assertEquals("2021-01-01 00:00:00", conf.getString("beginDateTime"));
|
||||||
|
Assert.assertEquals("2021-01-01 12:00:00", conf.getString("endDateTime"));
|
||||||
|
}
|
||||||
|
|
||||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
|
|
||||||
|
|
||||||
Long start = sdf.parse("2021-01-01 00:00:00").getTime();
|
@Test
|
||||||
Assert.assertEquals(start, conf.getLong("beginDateTime"));
|
public void jobInit_case02() {
|
||||||
|
// given
|
||||||
|
TDengineReader.Job job = new TDengineReader.Job();
|
||||||
|
Configuration configuration = Configuration.from("{" +
|
||||||
|
"\"username\": \"root\"," +
|
||||||
|
"\"password\": \"taosdata\"," +
|
||||||
|
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," +
|
||||||
|
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]" +
|
||||||
|
"}");
|
||||||
|
job.setPluginJobConf(configuration);
|
||||||
|
|
||||||
Long end = sdf.parse("2021-01-01 12:00:00").getTime();
|
// when
|
||||||
Assert.assertEquals(end, conf.getLong("endDateTime"));
|
job.init();
|
||||||
|
|
||||||
Assert.assertEquals(new Long(3600 * 1000), conf.getLong("splitInterval"));
|
// assert
|
||||||
|
Configuration conf = job.getPluginJobConf();
|
||||||
|
|
||||||
|
Assert.assertEquals("root", conf.getString(Key.USERNAME));
|
||||||
|
Assert.assertEquals("taosdata", conf.getString("password"));
|
||||||
|
Assert.assertEquals("weather", conf.getString("connection[0].table[0]"));
|
||||||
|
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void jobSplit() {
|
public void jobSplit_case01() {
|
||||||
|
// given
|
||||||
|
TDengineReader.Job job = new TDengineReader.Job();
|
||||||
|
Configuration configuration = Configuration.from("{" +
|
||||||
|
"\"username\": \"root\"," +
|
||||||
|
"\"password\": \"taosdata\"," +
|
||||||
|
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," +
|
||||||
|
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
|
||||||
|
"\"beginDateTime\": \"2021-01-01 00:00:00\"," +
|
||||||
|
"\"endDateTime\": \"2021-01-01 12:00:00\"" +
|
||||||
|
"}");
|
||||||
|
job.setPluginJobConf(configuration);
|
||||||
|
|
||||||
// when
|
// when
|
||||||
job.init();
|
job.init();
|
||||||
List<Configuration> configurationList = job.split(1);
|
List<Configuration> configurationList = job.split(1);
|
||||||
|
|
||||||
// assert
|
// assert
|
||||||
Assert.assertEquals(12, configurationList.size());
|
Assert.assertEquals(1, configurationList.size());
|
||||||
for (int i = 0; i < configurationList.size(); i++) {
|
Configuration conf = configurationList.get(0);
|
||||||
Configuration conf = configurationList.get(i);
|
Assert.assertEquals("root", conf.getString("username"));
|
||||||
Assert.assertEquals("root", conf.getString("user"));
|
Assert.assertEquals("taosdata", conf.getString("password"));
|
||||||
Assert.assertEquals("taosdata", conf.getString("password"));
|
Assert.assertEquals("weather", conf.getString("table[0]"));
|
||||||
Assert.assertEquals("weather", conf.getString("table[0]"));
|
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
|
||||||
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void jobSplit_case02() {
|
||||||
|
// given
|
||||||
|
TDengineReader.Job job = new TDengineReader.Job();
|
||||||
|
Configuration configuration = Configuration.from("{" +
|
||||||
|
"\"username\": \"root\"," +
|
||||||
|
"\"password\": \"taosdata\"," +
|
||||||
|
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," +
|
||||||
|
"\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," +
|
||||||
|
"}");
|
||||||
|
job.setPluginJobConf(configuration);
|
||||||
|
|
||||||
|
// when
|
||||||
|
job.init();
|
||||||
|
List<Configuration> configurationList = job.split(1);
|
||||||
|
|
||||||
|
// assert
|
||||||
|
Assert.assertEquals(1, configurationList.size());
|
||||||
|
Configuration conf = configurationList.get(0);
|
||||||
|
Assert.assertEquals("root", conf.getString("username"));
|
||||||
|
Assert.assertEquals("taosdata", conf.getString("password"));
|
||||||
|
Assert.assertEquals("weather", conf.getString("table[0]"));
|
||||||
|
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
50
tdenginereader/src/test/resources/t2dm.json
Normal file
50
tdenginereader/src/test/resources/t2dm.json
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "tdenginereader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"column": [
|
||||||
|
"*"
|
||||||
|
],
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"stb1"
|
||||||
|
],
|
||||||
|
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "rdbmswriter",
|
||||||
|
"parameter": {
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"stb2"
|
||||||
|
],
|
||||||
|
"jdbcUrl": "jdbc:dm://192.168.0.72:5236"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"username": "TESTUSER",
|
||||||
|
"password": "test123456",
|
||||||
|
"table": "stb2",
|
||||||
|
"column": [
|
||||||
|
"*"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -75,14 +75,14 @@
|
|||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
<!-- 添加 dm8 jdbc jar 包依赖-->
|
<!-- 添加 dm8 jdbc jar 包依赖-->
|
||||||
<dependency>
|
<!-- <dependency>-->
|
||||||
<groupId>com.dameng</groupId>
|
<!-- <groupId>com.dameng</groupId>-->
|
||||||
<artifactId>dm-jdbc</artifactId>
|
<!-- <artifactId>dm-jdbc</artifactId>-->
|
||||||
<version>1.8</version>
|
<!-- <version>1.8</version>-->
|
||||||
<scope>system</scope>
|
<!-- <scope>system</scope>-->
|
||||||
<systemPath>${project.basedir}/src/test/resources/DmJdbcDriver18.jar
|
<!-- <systemPath>${project.basedir}/src/test/resources/DmJdbcDriver18.jar-->
|
||||||
</systemPath>
|
<!-- </systemPath>-->
|
||||||
</dependency>
|
<!-- </dependency>-->
|
||||||
|
|
||||||
</dependencies>
|
</dependencies>
|
||||||
|
|
||||||
|
@ -19,6 +19,15 @@ import java.util.Date;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class DefaultDataHandler implements DataHandler {
|
public class DefaultDataHandler implements DataHandler {
|
||||||
|
static {
|
||||||
|
try {
|
||||||
|
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||||
|
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
e.printStackTrace();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
|
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
|
||||||
|
|
||||||
private String username;
|
private String username;
|
||||||
@ -62,6 +71,7 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
int count = 0;
|
int count = 0;
|
||||||
int affectedRows = 0;
|
int affectedRows = 0;
|
||||||
|
|
||||||
|
|
||||||
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
|
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
|
||||||
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
|
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
|
||||||
// prepare table_name -> table_meta
|
// prepare table_name -> table_meta
|
||||||
|
@ -9,4 +9,7 @@ public class Key {
|
|||||||
public static final String JDBC_URL = "jdbcUrl";
|
public static final String JDBC_URL = "jdbcUrl";
|
||||||
public static final String COLUMN = "column";
|
public static final String COLUMN = "column";
|
||||||
public static final String IGNORE_TAGS_UNMATCHED = "ignoreTagsUnmatched";
|
public static final String IGNORE_TAGS_UNMATCHED = "ignoreTagsUnmatched";
|
||||||
|
|
||||||
|
public static final String BEGIN_DATETIME = "beginDateTime";
|
||||||
|
public static final String END_DATETIME = "endDateTime";
|
||||||
}
|
}
|
@ -17,7 +17,7 @@ public class SchemaManager {
|
|||||||
private final Connection conn;
|
private final Connection conn;
|
||||||
private TimestampPrecision precision;
|
private TimestampPrecision precision;
|
||||||
|
|
||||||
SchemaManager(Connection conn) {
|
public SchemaManager(Connection conn) {
|
||||||
this.conn = conn;
|
this.conn = conn;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -25,7 +25,7 @@ public class TDengineWriter extends Writer {
|
|||||||
this.originalConfig = super.getPluginJobConf();
|
this.originalConfig = super.getPluginJobConf();
|
||||||
this.originalConfig.set(PEER_PLUGIN_NAME, getPeerPluginName());
|
this.originalConfig.set(PEER_PLUGIN_NAME, getPeerPluginName());
|
||||||
|
|
||||||
// check user
|
// check username
|
||||||
String user = this.originalConfig.getString(Key.USERNAME);
|
String user = this.originalConfig.getString(Key.USERNAME);
|
||||||
if (StringUtils.isBlank(user))
|
if (StringUtils.isBlank(user))
|
||||||
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
|
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
|
||||||
|
@ -99,7 +99,7 @@ public class DM2TDengineTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void before() throws SQLException, ClassNotFoundException {
|
public void before() throws SQLException {
|
||||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
|
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
|
||||||
long ts = System.currentTimeMillis();
|
long ts = System.currentTimeMillis();
|
||||||
|
|
||||||
|
9
tdenginewriter/src/test/resources/incremental_sync/clean_env.sh
Executable file
9
tdenginewriter/src/test/resources/incremental_sync/clean_env.sh
Executable file
@ -0,0 +1,9 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
datax_home_dir=$(dirname $(readlink -f "$0"))
|
||||||
|
|
||||||
|
curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'drop table if exists db2.stb2;' 192.168.1.93:6041/rest/sql
|
||||||
|
curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create table if not exists db2.stb2 (`ts` TIMESTAMP,`f2` SMALLINT,`f4` BIGINT,`f5` FLOAT,`f6` DOUBLE,`f7` DOUBLE,`f8` BOOL,`f9` NCHAR(100),`f10` NCHAR(200)) TAGS (`f1` TINYINT,`f3` INT);' 192.168.1.93:6041/rest/sql
|
||||||
|
|
||||||
|
rm -f ${datax_home_dir}/log/*
|
||||||
|
rm -f ${datax_home_dir}/job/*.csv
|
@ -0,0 +1,63 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "rdbmsreader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "TESTUSER",
|
||||||
|
"password": "test123456",
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"querySql": [
|
||||||
|
"select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1"
|
||||||
|
],
|
||||||
|
"jdbcUrl": [
|
||||||
|
"jdbc:dm://192.168.0.72:5236"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"where": "1=1",
|
||||||
|
"fetchSize": 1024
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "tdenginewriter",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"column": [
|
||||||
|
"tbname",
|
||||||
|
"ts",
|
||||||
|
"f1",
|
||||||
|
"f2",
|
||||||
|
"f3",
|
||||||
|
"f4",
|
||||||
|
"f5",
|
||||||
|
"f6",
|
||||||
|
"f7",
|
||||||
|
"f8",
|
||||||
|
"f9",
|
||||||
|
"f10"
|
||||||
|
],
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"stb2"
|
||||||
|
],
|
||||||
|
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"batchSize": 1000,
|
||||||
|
"ignoreTagsUnmatched": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
57
tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh
Executable file
57
tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh
Executable file
@ -0,0 +1,57 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
set -e
|
||||||
|
#set -x
|
||||||
|
|
||||||
|
datax_home_dir=$(dirname $(readlink -f "$0"))
|
||||||
|
table_name="stb1"
|
||||||
|
update_key="ts"
|
||||||
|
|
||||||
|
while getopts "hd:t:" arg; do
|
||||||
|
case $arg in
|
||||||
|
d)
|
||||||
|
datax_home_dir=$(echo $OPTARG)
|
||||||
|
;;
|
||||||
|
v)
|
||||||
|
table_name=$(echo $OPTARG)
|
||||||
|
;;
|
||||||
|
h)
|
||||||
|
echo "Usage: $(basename $0) -d [datax_home_dir] -t [table_name] -k [update_key]"
|
||||||
|
echo " -h help"
|
||||||
|
exit 0
|
||||||
|
;;
|
||||||
|
?) #unknow option
|
||||||
|
echo "unkonw argument"
|
||||||
|
exit 1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
done
|
||||||
|
|
||||||
|
if [[ -e ${datax_home_dir}/job/${table_name}.csv ]]; then
|
||||||
|
MAX_TIME=$(cat ${datax_home_dir}/job/${table_name}.csv)
|
||||||
|
else
|
||||||
|
MAX_TIME="null"
|
||||||
|
fi
|
||||||
|
current_datetime=$(date +"%Y-%m-%d %H:%M:%S")
|
||||||
|
current_timestamp=$(date +%s)
|
||||||
|
|
||||||
|
if [ "$MAX_TIME" != "null" ]; then
|
||||||
|
WHERE="${update_key} >= '$MAX_TIME' and ${update_key} < '$current_datetime'"
|
||||||
|
sed "s/1=1/$WHERE/g" ${datax_home_dir}/job/dm2t-update.json >${datax_home_dir}/job/dm2t_${current_timestamp}.json
|
||||||
|
echo "incremental data synchronization, from '${MAX_TIME}' to '${current_datetime}'"
|
||||||
|
python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t_${current_timestamp}.json 1> /dev/null 2>&1
|
||||||
|
else
|
||||||
|
echo "full data synchronization, to '${current_datetime}'"
|
||||||
|
python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t-update.json 1> /dev/null 2>&1
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [[ $? -ne 0 ]]; then
|
||||||
|
echo "datax migration job falied"
|
||||||
|
else
|
||||||
|
echo ${current_datetime} >$datax_home_dir/job/${table_name}.csv
|
||||||
|
echo "datax migration job success"
|
||||||
|
fi
|
||||||
|
|
||||||
|
rm -rf ${datax_home_dir}/job/dm2t_${current_timestamp}.json
|
||||||
|
|
||||||
|
#while true; do ./dm2t_sync.sh; sleep 5s; done
|
5
tdenginewriter/src/test/resources/incremental_sync/upload.sh
Executable file
5
tdenginewriter/src/test/resources/incremental_sync/upload.sh
Executable file
@ -0,0 +1,5 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
scp dm2t-update.json root@192.168.56.105:/root/workspace/tmp/datax/job
|
||||||
|
scp dm2t_sync.sh root@192.168.56.105:/root/workspace/tmp/datax
|
||||||
|
scp clean_env.sh root@192.168.56.105:/root/workspace/tmp/datax
|
Loading…
Reference in New Issue
Block a user