diff --git a/tdenginereader/doc/tdenginereader.md b/tdenginereader/doc/tdenginereader-CN.md similarity index 88% rename from tdenginereader/doc/tdenginereader.md rename to tdenginereader/doc/tdenginereader-CN.md index 3c683a64..fe0e4877 100644 --- a/tdenginereader/doc/tdenginereader.md +++ b/tdenginereader/doc/tdenginereader-CN.md @@ -37,8 +37,7 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。 "phase" ], "beginDateTime": "2017-07-14 10:40:00", - "endDateTime": "2017-08-14 10:40:00", - "splitInterval": "1d" + "endDateTime": "2017-08-14 10:40:00" } }, "writer": { @@ -79,17 +78,12 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。 * 必选:是
* 默认值:无
* **beginDateTime** - * 描述:数据的开始时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss
- * 必选:是
+ * 描述:数据的开始时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss,如果不填为全量同步
+ * 必选:否
* 默认值:无
* **endDateTime** - * 描述:数据的结束时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss
- * 必选:是
- * 默认值:无
-* **splitInterval** - * 描述:按照splitInterval来划分task, 每splitInterval创建一个task。例如,20d代表按照每20天的数据划分为1个task。 - 可以配置的时间单位为:d(天), h(小时), m(分钟), s(秒)
- * 必选:是
+ * 描述:数据的结束时间,Job迁移从begineDateTime到endDateTime的数据,格式为yyyy-MM-dd HH:mm:ss,如果不填为全量同步
+ * 必选:否
* 默认值:无
### 3.3 类型转换 @@ -106,7 +100,6 @@ TDengineReader 通过TDengine的JDBC driver查询获取数据。 | BINARY | Bytes | | NCHAR | String | - ## 4 性能报告 ### 4.1 环境准备 diff --git a/tdenginereader/pom.xml b/tdenginereader/pom.xml index ca444bce..69ea0c0d 100644 --- a/tdenginereader/pom.xml +++ b/tdenginereader/pom.xml @@ -29,10 +29,29 @@ + + com.alibaba + fastjson + 1.2.78 + + + + com.alibaba.datax.tdenginewriter + tdenginewriter + 0.0.1-SNAPSHOT + compile + + com.taosdata.jdbc taos-jdbcdriver 2.0.37 + + + com.alibaba + fastjson + + @@ -47,6 +66,21 @@ 0.0.1-SNAPSHOT compile + + + com.alibaba.datax + datax-core + 0.0.1-SNAPSHOT + test + + + + com.dameng + dm-jdbc + 1.8 + system + ${project.basedir}/src/test/resources/DmJdbcDriver18.jar + diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/Key.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/Key.java deleted file mode 100644 index 95b55386..00000000 --- a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/Key.java +++ /dev/null @@ -1,19 +0,0 @@ -package com.alibaba.datax.plugin.reader; - -public class Key { - - public static final String JDBC_URL = "jdbcUrl"; -// public static final String HOST = "host"; -// public static final String PORT = "port"; -// public static final String DB = "db"; - public static final String TABLE = "table"; - public static final String USER = "username"; - public static final String PASSWORD = "password"; - public static final String CONNECTION = "connection"; -// public static final String SQL = "sql"; - public static final String BEGIN_DATETIME = "beginDateTime"; - public static final String END_DATETIME = "endDateTime"; - public static final String SPLIT_INTERVAL = "splitInterval"; - public static final String COLUMN = "column"; - public static final String MANDATORY_ENCODING = "mandatoryEncoding"; -} diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java index f00d879f..6c2f1223 100644 --- a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java +++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java @@ -1,13 +1,11 @@ package com.alibaba.datax.plugin.reader; -import com.alibaba.datax.common.constant.CommonConstant; import com.alibaba.datax.common.element.*; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordSender; import com.alibaba.datax.common.spi.Reader; import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.plugin.rdbms.util.DataBaseType; -import com.alibaba.fastjson.JSON; +import com.alibaba.datax.plugin.writer.tdenginewriter.Key; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,84 +20,77 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; public class TDengineReader extends Reader { + private static final String DATETIME_FORMAT = "yyyy-MM-dd HH:mm:ss"; public static class Job extends Reader.Job { - private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration originalConfig; @Override public void init() { this.originalConfig = super.getPluginJobConf(); - // check user - String user = this.originalConfig.getString(Key.USER); - if (StringUtils.isBlank(user)) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.USER + "] is not set."); + // check username + String username = this.originalConfig.getString(Key.USERNAME); + if (StringUtils.isBlank(username)) + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.USERNAME + "] is not set."); // check password String password = this.originalConfig.getString(Key.PASSWORD); if (StringUtils.isBlank(password)) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.PASSWORD + "] is not set."); - - SimpleDateFormat format = new SimpleDateFormat(DATETIME_FORMAT); - // check beginDateTime - String beginDatetime = this.originalConfig.getString(Key.BEGIN_DATETIME); - if (StringUtils.isBlank(beginDatetime)) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] is not set."); - Long start; - try { - start = format.parse(beginDatetime).getTime(); - } catch (ParseException e) { - throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format."); - } - - // check endDateTime - String endDatetime = this.originalConfig.getString(Key.END_DATETIME); - if (StringUtils.isBlank(endDatetime)) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.END_DATETIME + "] is not set."); - Long end; - try { - end = format.parse(endDatetime).getTime(); - } catch (ParseException e) { - throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.END_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format."); - } - if (start >= end) - throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "]."); - - // check splitInterval - String splitInterval = this.originalConfig.getString(Key.SPLIT_INTERVAL); - Long split; - if (StringUtils.isBlank(splitInterval)) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.SPLIT_INTERVAL + "] is not set."); - try { - split = parseSplitInterval(splitInterval); - } catch (Exception e) { - throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.SPLIT_INTERVAL + "] should be like: \"123d|h|m|s\", error: " + e.getMessage()); - } - - this.originalConfig.set(Key.BEGIN_DATETIME, start); - this.originalConfig.set(Key.END_DATETIME, end); - this.originalConfig.set(Key.SPLIT_INTERVAL, split); + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.PASSWORD + "] is not set."); // check connection List connection = this.originalConfig.getList(Key.CONNECTION); if (connection == null || connection.isEmpty()) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.CONNECTION + "] is not set."); + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.CONNECTION + "] is not set."); for (int i = 0; i < connection.size(); i++) { Configuration conn = Configuration.from(connection.get(i).toString()); List table = conn.getList(Key.TABLE); if (table == null || table.isEmpty()) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set."); + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.TABLE + "] of connection[" + (i + 1) + "] is not set."); String jdbcUrl = conn.getString(Key.JDBC_URL); if (StringUtils.isBlank(jdbcUrl)) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.JDBC_URL + "] of connection[" + (i + 1) + "] is not set."); + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.JDBC_URL + "] of connection[" + (i + 1) + "] is not set."); } - // check column List column = this.originalConfig.getList(Key.COLUMN); if (column == null || column.isEmpty()) - throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.CONNECTION + "] is not set or is empty."); + throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.CONNECTION + "] is not set or is empty."); + + SimpleDateFormat format = new SimpleDateFormat(DATETIME_FORMAT); + // check beginDateTime + String beginDatetime = this.originalConfig.getString(Key.BEGIN_DATETIME); + long start = Long.MIN_VALUE; + if (!StringUtils.isBlank(beginDatetime)) { + try { + start = format.parse(beginDatetime).getTime(); + } catch (ParseException e) { + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.BEGIN_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format."); + } + } + // check endDateTime + String endDatetime = this.originalConfig.getString(Key.END_DATETIME); + long end = Long.MAX_VALUE; + if (!StringUtils.isBlank(endDatetime)) { + try { + end = format.parse(endDatetime).getTime(); + } catch (ParseException e) { + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.END_DATETIME + "] needs to conform to the [" + DATETIME_FORMAT + "] format."); + } + } + if (start >= end) + throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "]."); + } @Override @@ -110,78 +101,58 @@ public class TDengineReader extends Reader { @Override public List split(int adviceNumber) { List configurations = new ArrayList<>(); - // do split - Long start = this.originalConfig.getLong(Key.BEGIN_DATETIME); - Long end = this.originalConfig.getLong(Key.END_DATETIME); - Long split = this.originalConfig.getLong(Key.SPLIT_INTERVAL); - List conns = this.originalConfig.getList(Key.CONNECTION); - - for (Long ts = start; ts < end; ts += split) { - for (int i = 0; i < conns.size(); i++) { - Configuration clone = this.originalConfig.clone(); - clone.remove(Key.SPLIT_INTERVAL); - - clone.set(Key.BEGIN_DATETIME, ts); - clone.set(Key.END_DATETIME, Math.min(ts + split, end)); - - Configuration conf = Configuration.from(conns.get(i).toString()); - String jdbcUrl = conf.getString(Key.JDBC_URL); - clone.set(Key.JDBC_URL, jdbcUrl); - clone.set(Key.TABLE, conf.getList(Key.TABLE)); - - // 抽取 jdbcUrl 中的 ip/port 进行资源使用的打标,以提供给 core 做有意义的 shuffle 操作 - clone.set(CommonConstant.LOAD_BALANCE_RESOURCE_MARK, DataBaseType.parseIpFromJdbcUrl(jdbcUrl)); - clone.remove(Key.CONNECTION); - - configurations.add(clone); - LOG.info("Configuration: {}", JSON.toJSONString(clone)); - } + List connectionList = this.originalConfig.getList(Key.CONNECTION); + for (Object conn : connectionList) { + Configuration clone = this.originalConfig.clone(); + Configuration conf = Configuration.from(conn.toString()); + String jdbcUrl = conf.getString(Key.JDBC_URL); + clone.set(Key.JDBC_URL, jdbcUrl); + clone.set(Key.TABLE, conf.getList(Key.TABLE)); + clone.remove(Key.CONNECTION); + configurations.add(clone); } + + LOG.info("Configuration: {}", configurations); return configurations; } } public static class Task extends Reader.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); - private Connection conn; - private Long startTime; - private Long endTime; + private Configuration readerSliceConfig; + private Connection conn; private List tables; private List columns; - private String mandatoryEncoding; + + private String startTime; + private String endTime; @Override public void init() { - Configuration readerSliceConfig = super.getPluginJobConf(); - LOG.info("getPluginJobConf: {}", JSON.toJSONString(readerSliceConfig)); + this.readerSliceConfig = super.getPluginJobConf(); + LOG.info("getPluginJobConf: {}", readerSliceConfig); String url = readerSliceConfig.getString(Key.JDBC_URL); if (StringUtils.isBlank(url)) throw DataXException.asDataXException(TDengineReaderErrorCode.REQUIRED_VALUE, "The parameter [" + Key.JDBC_URL + "] is not set."); - - tables = readerSliceConfig.getList(Key.TABLE, String.class); - columns = readerSliceConfig.getList(Key.COLUMN, String.class); - - String user = readerSliceConfig.getString(Key.USER); + String user = readerSliceConfig.getString(Key.USERNAME); String password = readerSliceConfig.getString(Key.PASSWORD); - try { - conn = DriverManager.getConnection(url, user, password); + this.conn = DriverManager.getConnection(url, user, password); } catch (SQLException e) { throw DataXException.asDataXException(TDengineReaderErrorCode.CONNECTION_FAILED, - "The parameter [" + Key.JDBC_URL + "] : " + url + " failed to connect since: " + e.getMessage()); + "The parameter [" + Key.JDBC_URL + "] : " + url + " failed to connect since: " + e.getMessage(), e); } - this.mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, ""); - - startTime = readerSliceConfig.getLong(Key.BEGIN_DATETIME); - endTime = readerSliceConfig.getLong(Key.END_DATETIME); + this.tables = readerSliceConfig.getList(Key.TABLE, String.class); + this.columns = readerSliceConfig.getList(Key.COLUMN, String.class); + this.startTime = readerSliceConfig.getString(Key.BEGIN_DATETIME); + this.endTime = readerSliceConfig.getString(Key.END_DATETIME); } - @Override public void destroy() { @@ -189,18 +160,31 @@ public class TDengineReader extends Reader { @Override public void startRead(RecordSender recordSender) { + try (Statement stmt = conn.createStatement()) { for (String table : tables) { - String sql = "select " + StringUtils.join(columns, ",") + " from " + table - + " where _c0 >= " + startTime + " and _c0 < " + endTime; + StringBuilder sb = new StringBuilder(); + sb.append("select ").append(StringUtils.join(columns, ",")) + .append(" from ").append(table).append(" "); + + if (StringUtils.isBlank(startTime)) { + sb.append("where _c0 >= ").append(Long.MIN_VALUE); + } else { + sb.append("where _c0 >= '").append(startTime).append("'"); + } + if (!StringUtils.isBlank(endTime)) { + sb.append(" and _c0 < '").append(endTime).append("'"); + } + + String sql = sb.toString(); ResultSet rs = stmt.executeQuery(sql); - ResultSetMetaData metaData = rs.getMetaData(); while (rs.next()) { - transportOneRecord(recordSender, rs, metaData, metaData.getColumnCount(), this.mandatoryEncoding); + Record record = buildRecord(recordSender, rs, "UTF-8"); + recordSender.sendToWriter(record); } } } catch (SQLException e) { - throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e); + throw DataXException.asDataXException(TDengineReaderErrorCode.RUNTIME_EXCEPTION, e.getMessage(), e); } finally { try { if (conn != null) @@ -211,17 +195,11 @@ public class TDengineReader extends Reader { } } - private Record transportOneRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnCount, String mandatoryEncoding) { - Record record = buildRecord(recordSender, rs, metaData, columnCount, mandatoryEncoding); - recordSender.sendToWriter(record); - return record; - } - - private Record buildRecord(RecordSender recordSender, ResultSet rs, ResultSetMetaData metaData, int columnCount, String mandatoryEncoding) { + private Record buildRecord(RecordSender recordSender, ResultSet rs, String mandatoryEncoding) { Record record = recordSender.createRecord(); - try { - for (int i = 1; i <= columnCount; i++) { + ResultSetMetaData metaData = rs.getMetaData(); + for (int i = 1; i <= metaData.getColumnCount(); i++) { int columnType = metaData.getColumnType(i); switch (columnType) { case Types.SMALLINT: @@ -261,16 +239,16 @@ public class TDengineReader extends Reader { } } - private static final long second = 1000; - private static final long minute = 60 * second; - private static final long hour = 60 * minute; - private static final long day = 24 * hour; - private static Long parseSplitInterval(String splitInterval) throws Exception { + final long second = 1000; + final long minute = 60 * second; + final long hour = 60 * minute; + final long day = 24 * hour; + Pattern compile = Pattern.compile("^(\\d+)([dhms])$"); Matcher matcher = compile.matcher(splitInterval); while (matcher.find()) { - Long value = Long.valueOf(matcher.group(1)); + long value = Long.parseLong(matcher.group(1)); if (value == 0) throw new Exception("invalid splitInterval: 0"); char unit = matcher.group(2).charAt(0); diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReaderErrorCode.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReaderErrorCode.java index 68bc11e7..b784ab06 100644 --- a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReaderErrorCode.java +++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReaderErrorCode.java @@ -4,9 +4,10 @@ import com.alibaba.datax.common.spi.ErrorCode; public enum TDengineReaderErrorCode implements ErrorCode { - REQUIRED_VALUE("TDengineReader-00", "缺失必要的值"), - ILLEGAL_VALUE("TDengineReader-01", "值非法"), - CONNECTION_FAILED("TDengineReader-02", "连接错误"); + REQUIRED_VALUE("TDengineReader-00", "parameter value is missing"), + ILLEGAL_VALUE("TDengineReader-01", "invalid parameter value"), + CONNECTION_FAILED("TDengineReader-02", "connection error"), + RUNTIME_EXCEPTION("TDengineWriter-03", "runtime exception"); private final String code; private final String description; diff --git a/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2DMTest.java b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2DMTest.java new file mode 100644 index 00000000..d9099e01 --- /dev/null +++ b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengine2DMTest.java @@ -0,0 +1,87 @@ +package com.alibaba.datax.plugin.reader; + +import com.alibaba.datax.core.Engine; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.Random; + +@Ignore +public class TDengine2DMTest { + private static final String host1 = "192.168.56.105"; + private static final String host2 = "192.168.0.72"; + + private final Random random = new Random(System.currentTimeMillis()); + + @Test + public void t2dm_case01() throws Throwable { + // given + createSupTable("ms"); + + // when + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + @Test + public void t2dm_case02() throws Throwable { + // given + createSupTable("us"); + + // when + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + @Test + public void t2dm_case03() throws Throwable { + // given + createSupTable("ns"); + + // when + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2dm.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + private void createSupTable(String precision) throws SQLException { + final String url = "jdbc:TAOS-RS://" + host1 + ":6041/"; + try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists db1"); + stmt.execute("create database if not exists db1 precision '" + precision + "'"); + stmt.execute("create table db1.stb1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " + + "f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, t3 smallint, " + + "t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(100))"); + + for (int i = 1; i <= 10; i++) { + stmt.execute("insert into db1.tb" + i + " using db1.stb1 tags(now, " + random.nextInt(10) + "," + + random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextInt(10) + "," + + random.nextFloat() + "," + random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123'," + + "'北京朝阳望京') values(now+" + i + "s, " + random.nextInt(10) + "," + random.nextInt(10) + "," + + +random.nextInt(10) + "," + random.nextInt(10) + "," + random.nextFloat() + "," + + random.nextDouble() + "," + random.nextBoolean() + ",'abcABC123','北京朝阳望京')"); + } + stmt.close(); + } + + final String url2 = "jdbc:dm://" + host2 + ":5236"; + try (Connection conn = DriverManager.getConnection(url2, "TESTUSER", "test123456")) { + conn.setAutoCommit(true); + Statement stmt = conn.createStatement(); + stmt.execute("drop table if exists stb2"); + stmt.execute("create table stb2(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " + + "f6 double, f7 BIT, f8 VARCHAR(100), f9 VARCHAR2(200), t1 timestamp, t2 tinyint, t3 smallint, " + + "t4 int, t5 bigint, t6 float, t7 double, t8 BIT, t9 VARCHAR(100), t10 VARCHAR2(200))"); + } + } + +} diff --git a/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java index d51d4229..da980c5f 100644 --- a/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java +++ b/tdenginereader/src/test/java/com/alibaba/datax/plugin/reader/TDengineReaderTest.java @@ -1,72 +1,118 @@ package com.alibaba.datax.plugin.reader; import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.writer.tdenginewriter.Key; import org.junit.Assert; -import org.junit.Before; import org.junit.Test; -import java.text.ParseException; -import java.text.SimpleDateFormat; import java.util.List; public class TDengineReaderTest { - TDengineReader.Job job; - - @Before - public void before() { - job = new TDengineReader.Job(); + @Test + public void jobInit_case01() { + // given + TDengineReader.Job job = new TDengineReader.Job(); Configuration configuration = Configuration.from("{" + - "\"user\": \"root\"," + + "\"username\": \"root\"," + "\"password\": \"taosdata\"," + "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," + "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," + "\"beginDateTime\": \"2021-01-01 00:00:00\"," + - "\"endDateTime\": \"2021-01-01 12:00:00\"," + - "\"splitInterval\": \"1h\"" + + "\"endDateTime\": \"2021-01-01 12:00:00\"" + "}"); job.setPluginJobConf(configuration); - } - @Test - public void jobInit() throws ParseException { // when job.init(); // assert Configuration conf = job.getPluginJobConf(); - Assert.assertEquals("root", conf.getString("user")); + Assert.assertEquals("root", conf.getString(Key.USERNAME)); Assert.assertEquals("taosdata", conf.getString("password")); Assert.assertEquals("weather", conf.getString("connection[0].table[0]")); Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl")); + Assert.assertEquals("2021-01-01 00:00:00", conf.getString("beginDateTime")); + Assert.assertEquals("2021-01-01 12:00:00", conf.getString("endDateTime")); + } - SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); - Long start = sdf.parse("2021-01-01 00:00:00").getTime(); - Assert.assertEquals(start, conf.getLong("beginDateTime")); + @Test + public void jobInit_case02() { + // given + TDengineReader.Job job = new TDengineReader.Job(); + Configuration configuration = Configuration.from("{" + + "\"username\": \"root\"," + + "\"password\": \"taosdata\"," + + "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," + + "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]" + + "}"); + job.setPluginJobConf(configuration); - Long end = sdf.parse("2021-01-01 12:00:00").getTime(); - Assert.assertEquals(end, conf.getLong("endDateTime")); + // when + job.init(); - Assert.assertEquals(new Long(3600 * 1000), conf.getLong("splitInterval")); + // assert + Configuration conf = job.getPluginJobConf(); + + Assert.assertEquals("root", conf.getString(Key.USERNAME)); + Assert.assertEquals("taosdata", conf.getString("password")); + Assert.assertEquals("weather", conf.getString("connection[0].table[0]")); + Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl")); } @Test - public void jobSplit() { + public void jobSplit_case01() { + // given + TDengineReader.Job job = new TDengineReader.Job(); + Configuration configuration = Configuration.from("{" + + "\"username\": \"root\"," + + "\"password\": \"taosdata\"," + + "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," + + "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," + + "\"beginDateTime\": \"2021-01-01 00:00:00\"," + + "\"endDateTime\": \"2021-01-01 12:00:00\"" + + "}"); + job.setPluginJobConf(configuration); + // when job.init(); List configurationList = job.split(1); // assert - Assert.assertEquals(12, configurationList.size()); - for (int i = 0; i < configurationList.size(); i++) { - Configuration conf = configurationList.get(i); - Assert.assertEquals("root", conf.getString("user")); - Assert.assertEquals("taosdata", conf.getString("password")); - Assert.assertEquals("weather", conf.getString("table[0]")); - Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl")); - } + Assert.assertEquals(1, configurationList.size()); + Configuration conf = configurationList.get(0); + Assert.assertEquals("root", conf.getString("username")); + Assert.assertEquals("taosdata", conf.getString("password")); + Assert.assertEquals("weather", conf.getString("table[0]")); + Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl")); + + } + + @Test + public void jobSplit_case02() { + // given + TDengineReader.Job job = new TDengineReader.Job(); + Configuration configuration = Configuration.from("{" + + "\"username\": \"root\"," + + "\"password\": \"taosdata\"," + + "\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," + + "\"column\": [\"ts\",\"current\",\"voltage\",\"phase\"]," + + "}"); + job.setPluginJobConf(configuration); + + // when + job.init(); + List configurationList = job.split(1); + + // assert + Assert.assertEquals(1, configurationList.size()); + Configuration conf = configurationList.get(0); + Assert.assertEquals("root", conf.getString("username")); + Assert.assertEquals("taosdata", conf.getString("password")); + Assert.assertEquals("weather", conf.getString("table[0]")); + Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl")); } } \ No newline at end of file diff --git a/tdenginereader/src/test/resources/t2dm.json b/tdenginereader/src/test/resources/t2dm.json new file mode 100644 index 00000000..b2cf91e2 --- /dev/null +++ b/tdenginereader/src/test/resources/t2dm.json @@ -0,0 +1,50 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "*" + ], + "connection": [ + { + "table": [ + "stb1" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1" + } + ] + } + }, + "writer": { + "name": "rdbmswriter", + "parameter": { + "connection": [ + { + "table": [ + "stb2" + ], + "jdbcUrl": "jdbc:dm://192.168.0.72:5236" + } + ], + "username": "TESTUSER", + "password": "test123456", + "table": "stb2", + "column": [ + "*" + ] + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml index 97c750db..791a4bdc 100644 --- a/tdenginewriter/pom.xml +++ b/tdenginewriter/pom.xml @@ -75,14 +75,14 @@ test - - com.dameng - dm-jdbc - 1.8 - system - ${project.basedir}/src/test/resources/DmJdbcDriver18.jar - - + + + + + + + + diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index 4068be42..a8196153 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -19,6 +19,15 @@ import java.util.Date; import java.util.stream.Collectors; public class DefaultDataHandler implements DataHandler { + static { + try { + Class.forName("com.taosdata.jdbc.TSDBDriver"); + Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); + } catch (ClassNotFoundException e) { + e.printStackTrace(); + } + } + private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); private String username; @@ -62,6 +71,7 @@ public class DefaultDataHandler implements DataHandler { int count = 0; int affectedRows = 0; + try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) { LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established."); // prepare table_name -> table_meta diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java index 1d7ee214..b95ff584 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java @@ -9,4 +9,7 @@ public class Key { public static final String JDBC_URL = "jdbcUrl"; public static final String COLUMN = "column"; public static final String IGNORE_TAGS_UNMATCHED = "ignoreTagsUnmatched"; + + public static final String BEGIN_DATETIME = "beginDateTime"; + public static final String END_DATETIME = "endDateTime"; } \ No newline at end of file diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java index 63d67a2a..c48b7942 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java @@ -17,7 +17,7 @@ public class SchemaManager { private final Connection conn; private TimestampPrecision precision; - SchemaManager(Connection conn) { + public SchemaManager(Connection conn) { this.conn = conn; } diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java index 7cc76a77..eb538022 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java @@ -25,7 +25,7 @@ public class TDengineWriter extends Writer { this.originalConfig = super.getPluginJobConf(); this.originalConfig.set(PEER_PLUGIN_NAME, getPeerPluginName()); - // check user + // check username String user = this.originalConfig.getString(Key.USERNAME); if (StringUtils.isBlank(user)) throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter [" diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java index 0eb91deb..15f6b1bc 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DM2TDengineTest.java @@ -99,7 +99,7 @@ public class DM2TDengineTest { } @Before - public void before() throws SQLException, ClassNotFoundException { + public void before() throws SQLException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); long ts = System.currentTimeMillis(); diff --git a/tdenginewriter/src/test/resources/incremental_sync/clean_env.sh b/tdenginewriter/src/test/resources/incremental_sync/clean_env.sh new file mode 100755 index 00000000..f3dca7c1 --- /dev/null +++ b/tdenginewriter/src/test/resources/incremental_sync/clean_env.sh @@ -0,0 +1,9 @@ +#!/bin/bash + +datax_home_dir=$(dirname $(readlink -f "$0")) + +curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'drop table if exists db2.stb2;' 192.168.1.93:6041/rest/sql +curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create table if not exists db2.stb2 (`ts` TIMESTAMP,`f2` SMALLINT,`f4` BIGINT,`f5` FLOAT,`f6` DOUBLE,`f7` DOUBLE,`f8` BOOL,`f9` NCHAR(100),`f10` NCHAR(200)) TAGS (`f1` TINYINT,`f3` INT);' 192.168.1.93:6041/rest/sql + +rm -f ${datax_home_dir}/log/* +rm -f ${datax_home_dir}/job/*.csv \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/incremental_sync/dm2t-update.json b/tdenginewriter/src/test/resources/incremental_sync/dm2t-update.json new file mode 100644 index 00000000..d9285b23 --- /dev/null +++ b/tdenginewriter/src/test/resources/incremental_sync/dm2t-update.json @@ -0,0 +1,63 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "rdbmsreader", + "parameter": { + "username": "TESTUSER", + "password": "test123456", + "connection": [ + { + "querySql": [ + "select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1" + ], + "jdbcUrl": [ + "jdbc:dm://192.168.0.72:5236" + ] + } + ], + "where": "1=1", + "fetchSize": 1024 + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "tbname", + "ts", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9", + "f10" + ], + "connection": [ + { + "table": [ + "stb2" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh b/tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh new file mode 100755 index 00000000..426c6233 --- /dev/null +++ b/tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh @@ -0,0 +1,57 @@ +#!/bin/bash + +set -e +#set -x + +datax_home_dir=$(dirname $(readlink -f "$0")) +table_name="stb1" +update_key="ts" + +while getopts "hd:t:" arg; do + case $arg in + d) + datax_home_dir=$(echo $OPTARG) + ;; + v) + table_name=$(echo $OPTARG) + ;; + h) + echo "Usage: $(basename $0) -d [datax_home_dir] -t [table_name] -k [update_key]" + echo " -h help" + exit 0 + ;; + ?) #unknow option + echo "unkonw argument" + exit 1 + ;; + esac +done + +if [[ -e ${datax_home_dir}/job/${table_name}.csv ]]; then + MAX_TIME=$(cat ${datax_home_dir}/job/${table_name}.csv) +else + MAX_TIME="null" +fi +current_datetime=$(date +"%Y-%m-%d %H:%M:%S") +current_timestamp=$(date +%s) + +if [ "$MAX_TIME" != "null" ]; then + WHERE="${update_key} >= '$MAX_TIME' and ${update_key} < '$current_datetime'" + sed "s/1=1/$WHERE/g" ${datax_home_dir}/job/dm2t-update.json >${datax_home_dir}/job/dm2t_${current_timestamp}.json + echo "incremental data synchronization, from '${MAX_TIME}' to '${current_datetime}'" + python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t_${current_timestamp}.json 1> /dev/null 2>&1 +else + echo "full data synchronization, to '${current_datetime}'" + python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t-update.json 1> /dev/null 2>&1 +fi + +if [[ $? -ne 0 ]]; then + echo "datax migration job falied" +else + echo ${current_datetime} >$datax_home_dir/job/${table_name}.csv + echo "datax migration job success" +fi + +rm -rf ${datax_home_dir}/job/dm2t_${current_timestamp}.json + +#while true; do ./dm2t_sync.sh; sleep 5s; done \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/incremental_sync/upload.sh b/tdenginewriter/src/test/resources/incremental_sync/upload.sh new file mode 100755 index 00000000..c7d11ca1 --- /dev/null +++ b/tdenginewriter/src/test/resources/incremental_sync/upload.sh @@ -0,0 +1,5 @@ +#!/bin/bash + +scp dm2t-update.json root@192.168.56.105:/root/workspace/tmp/datax/job +scp dm2t_sync.sh root@192.168.56.105:/root/workspace/tmp/datax +scp clean_env.sh root@192.168.56.105:/root/workspace/tmp/datax \ No newline at end of file