diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml index 720ecbb8..459ad0b7 100644 --- a/tdenginewriter/pom.xml +++ b/tdenginewriter/pom.xml @@ -38,6 +38,12 @@ + + org.apache.commons + commons-lang3 + ${commons-lang3-version} + + com.alibaba.datax datax-common @@ -63,10 +69,12 @@ test - org.apache.commons - commons-lang3 - ${commons-lang3-version} + mysql + mysql-connector-java + 5.1.49 + test + diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java index 7afcb080..f22d4d6c 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java @@ -4,6 +4,5 @@ import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.plugin.TaskPluginCollector; public interface DataHandler { - int handle(RecordReceiver lineReceiver, TaskPluginCollector collector); } diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index 16be4c90..addda9cf 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -14,10 +14,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.*; -import java.util.ArrayList; +import java.util.*; import java.util.Date; -import java.util.List; -import java.util.Map; import java.util.stream.Collectors; public class DefaultDataHandler implements DataHandler { @@ -83,6 +81,11 @@ public class DefaultDataHandler implements DataHandler { } count++; } + + if (!recordBatch.isEmpty()) { + affectedRows = writeBatch(conn, recordBatch); + recordBatch.clear(); + } } catch (SQLException e) { throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage()); } @@ -302,15 +305,6 @@ public class DefaultDataHandler implements DataHandler { if (colMeta.type.equals("TIMESTAMP")) return dateAsLong(column) + "i64"; return "L'" + column.asString() + "'"; - case BYTES: - case STRING: - if (colMeta.type.equals("TIMESTAMP")) - return column.asString() + "i64"; - String value = column.asString(); - if (colMeta.type.startsWith("BINARY")) - return "'" + Utils.escapeSingleQuota(value) + "'"; - if (colMeta.type.startsWith("NCHAR")) - return "L'" + Utils.escapeSingleQuota(value) + "'"; case NULL: case BAD: return "NULL"; @@ -331,6 +325,16 @@ public class DefaultDataHandler implements DataHandler { if (colMeta.type.equals("BIGINT")) return column.asString() + "i64"; } + case BYTES: + case STRING: + if (colMeta.type.equals("TIMESTAMP")) + return column.asString() + "i64"; + String value = column.asString(); + value = value.replace("\"", "\\\""); + if (colMeta.type.startsWith("BINARY")) + return "\"" + value + "\""; + if (colMeta.type.startsWith("NCHAR")) + return "L\"" + value + "\""; case BOOL: default: return column.asString(); @@ -363,7 +367,9 @@ public class DefaultDataHandler implements DataHandler { boolean tagsAllMatch = columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { return colMeta.isTag; }).allMatch(colMeta -> { - return record.getColumn(indexOf(colMeta.field)).asString().equals(colMeta.value.toString()); + Column column = record.getColumn(indexOf(colMeta.field)); + boolean equals = equals(column, colMeta); + return equals; }); if (ignoreTagsUnmatched && !tagsAllMatch) @@ -386,6 +392,28 @@ public class DefaultDataHandler implements DataHandler { return executeUpdate(conn, sql); } + private boolean equals(Column column, ColumnMeta colMeta) { + switch (column.getType()) { + case BOOL: + return column.asBoolean().equals(Boolean.valueOf(colMeta.value.toString())); + case INT: + case LONG: + return column.asLong().equals(Long.valueOf(colMeta.value.toString())); + case DOUBLE: + return column.asDouble().equals(Double.valueOf(colMeta.value.toString())); + case NULL: + return colMeta.value == null; + case DATE: + return column.asDate().getTime() == ((Timestamp) colMeta.value).getTime(); + case BAD: + case BYTES: + return Arrays.equals(column.asBytes(), (byte[]) colMeta.value); + case STRING: + default: + return column.asString().equals(colMeta.value.toString()); + } + } + /** * table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"] * sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...) diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java deleted file mode 100644 index 89730d35..00000000 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Msg.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.alibaba.datax.plugin.writer.tdenginewriter; - -import java.util.Locale; -import java.util.ResourceBundle; - -/** - * i18n message util - */ -public class Msg { - private static ResourceBundle bundle; - - static { - bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault()); - } - - public static String get(String key) { - return bundle.getString(key); - } - -} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java index 877b9b6d..63d67a2a 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java @@ -81,7 +81,7 @@ public class SchemaManager { for (String tbname : tables) { if (!tableMetas.containsKey(tbname)) { - LOG.error("table metadata of " + tbname + " is empty!"); + throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "table metadata of " + tbname + " is empty!"); } } } catch (SQLException e) { diff --git a/tdenginewriter/src/main/resources/plugin.json b/tdenginewriter/src/main/resources/plugin.json index e54f65ff..10d8e2cf 100644 --- a/tdenginewriter/src/main/resources/plugin.json +++ b/tdenginewriter/src/main/resources/plugin.json @@ -3,7 +3,7 @@ "class": "com.alibaba.datax.plugin.writer.tdenginewriter.TDengineWriter", "description": { "useScene": "data migration to tdengine", - "mechanism": "use JNI or taos-jdbc to write data to tdengine." + "mechanism": "use taos-jdbcdriver to write data." }, - "developer": "zyyang-taosdata" + "developer": "support@taosdata.com" } \ No newline at end of file diff --git a/tdenginewriter/src/main/resources/plugin_job_template.json b/tdenginewriter/src/main/resources/plugin_job_template.json index 39c9c969..3d303305 100644 --- a/tdenginewriter/src/main/resources/plugin_job_template.json +++ b/tdenginewriter/src/main/resources/plugin_job_template.json @@ -1,24 +1,20 @@ { - "name": "tdenginewriter", - "parameter": { - "host": "127.0.0.1", - "port": 6030, - "dbname": "test", - "user": "root", - "password": "taosdata", - "batchSize": 1000, - "stable": "weather", - "tagColumn": { - "station": 0 - }, - "fieldColumn": { - "latitude": 1, - "longtitude": 2, - "tmax": 4, - "tmin": 5 - }, - "timestampColumn":{ - "date": 3 - } - } + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "" + ], + "connection": [ + { + "table": [ + "" + ], + "jdbcUrl": "" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } } \ No newline at end of file diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg.properties b/tdenginewriter/src/main/resources/tdenginewritermsg.properties deleted file mode 100644 index 48d862e7..00000000 --- a/tdenginewriter/src/main/resources/tdenginewritermsg.properties +++ /dev/null @@ -1,6 +0,0 @@ -try_get_schema_from_db=fail to get structure info of target table from configure file and will try to get it from database -batch_size_too_small='batchSize' is too small, please increase it and try again -column_number_error=number of columns is less than expected -tag_value_error=tag columns include 'null' value -ts_value_error=timestamp column type error or null -infer_column_type_error=fail to infer column type: sample count %d, column index %d \ No newline at end of file diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties b/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties deleted file mode 100644 index 48d862e7..00000000 --- a/tdenginewriter/src/main/resources/tdenginewritermsg_en_US.properties +++ /dev/null @@ -1,6 +0,0 @@ -try_get_schema_from_db=fail to get structure info of target table from configure file and will try to get it from database -batch_size_too_small='batchSize' is too small, please increase it and try again -column_number_error=number of columns is less than expected -tag_value_error=tag columns include 'null' value -ts_value_error=timestamp column type error or null -infer_column_type_error=fail to infer column type: sample count %d, column index %d \ No newline at end of file diff --git a/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties b/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties deleted file mode 100644 index 38d2cbb6..00000000 --- a/tdenginewriter/src/main/resources/tdenginewritermsg_zh_CN.properties +++ /dev/null @@ -1,6 +0,0 @@ -try_get_schema_from_db=\u65e0\u6cd5\u4ece\u914d\u7f6e\u6587\u4ef6\u83b7\u53d6\u8868\u7ed3\u6784\u4fe1\u606f\uff0c\u5c1d\u8bd5\u4ece\u6570\u636e\u5e93\u83b7\u53d6 -batch_size_too_small=batchSize\u592a\u5c0f\uff0c\u4f1a\u589e\u52a0\u81ea\u52a8\u7c7b\u578b\u63a8\u65ad\u9519\u8bef\u7684\u6982\u7387\uff0c\u5efa\u8bae\u6539\u5927\u540e\u91cd\u8bd5 -column_number_error=\u5b9e\u9645\u5217\u6570\u5c0f\u4e8e\u671f\u671b\u5217\u6570 -tag_value_error=\u6807\u7b7e\u5217\u5305\u542bnull -ts_value_error=\u65f6\u95f4\u6233\u5217\u4e3anull\u6216\u7c7b\u578b\u9519\u8bef -infer_column_type_error=\u6839\u636e\u91c7\u6837\u7684%d\u6761\u6570\u636e\uff0c\u65e0\u6cd5\u63a8\u65ad\u7b2c%d\u5217\u7684\u6570\u636e\u7c7b\u578b diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest2.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest2.java index 48baad1d..1ed32d63 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest2.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest2.java @@ -27,6 +27,8 @@ public class DefaultDataHandlerTest2 { @Test public void writeSupTableBySchemaless() throws SQLException { + + // given Configuration configuration = Configuration.from("{" + "\"username\": \"root\"," + diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/EngineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/EngineTest.java deleted file mode 100644 index 985c8047..00000000 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/EngineTest.java +++ /dev/null @@ -1,65 +0,0 @@ -package com.alibaba.datax.plugin.writer.tdenginewriter; - -import com.alibaba.datax.core.Engine; -import org.junit.Assert; -import org.junit.Test; - -import java.sql.*; - -public class EngineTest { - - @Test - public void opentsdb2tdengine() throws SQLException { - // when - String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/opentsdb2tdengine.json"}; - System.setProperty("datax.home", "../target/datax/datax"); - try { - Engine.entry(params); - } catch (Throwable e) { - e.printStackTrace(); - } - - // assert - String jdbcUrl = "jdbc:TAOS://192.168.56.105:6030/test?timestampFormat=TIMESTAMP"; - try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) { - Statement stmt = conn.createStatement(); - ResultSet rs = stmt.executeQuery("select count(*) from weather_temperature"); - int rows = 0; - while (rs.next()) { - rows = rs.getInt("count(*)"); - } - Assert.assertEquals(5, rows); - stmt.close(); - } - } - - @Test - public void mysql2tdengine() { - // given - // MYSQL SQL: - // create table t(id int primary key AUTO_INCREMENT, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, f6 double,ts timestamp, dt datetime,f7 nchar(64)) - // insert into t(f1,f2,f3,f4,f5,f6,ts,dt,f7) values(1,2,3,4,5,6,'2022-01-28 12:00:00','2022-01-28 12:00:00', 'beijing'); - // TDengine SQL: - // create table t(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, f6 double, dt timestamp,f7 nchar(64)); - - // when - String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/mysql2tdengine.json"}; - System.setProperty("datax.home", "../target/datax/datax"); - try { - Engine.entry(params); - } catch (Throwable e) { - e.printStackTrace(); - } - } - - @Test - public void tdengine2tdengine() { - String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/tdengine2tdengine.json"}; - System.setProperty("datax.home", "../target/datax/datax"); - try { - Engine.entry(params); - } catch (Throwable e) { - e.printStackTrace(); - } - } -} diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java deleted file mode 100644 index b1b7ddd8..00000000 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/MessageTest.java +++ /dev/null @@ -1,25 +0,0 @@ -package com.alibaba.datax.plugin.writer.tdenginewriter; - -import org.junit.Test; - -import java.util.Locale; -import java.util.ResourceBundle; - -import org.junit.Assert; - -public class MessageTest { - @Test - public void testChineseMessage() { - Locale local = new Locale("zh", "CN"); - ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", local); - String msg = bundle.getString("try_get_schema_fromdb"); - Assert.assertEquals("无法从配置文件获取表结构信息,尝试从数据库获取", msg); - } - - @Test - public void testDefaultMessage() { - ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault()); - String msg = bundle.getString("try_get_schema_fromdb"); - System.out.println(msg); - } -} diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java new file mode 100644 index 00000000..4a662711 --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java @@ -0,0 +1,70 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.core.Engine; +import org.junit.Before; +import org.junit.Test; + +import java.sql.*; +import java.text.SimpleDateFormat; +import java.util.Random; + +public class Mysql2TDengineTest { + + private static final String host1 = "192.168.56.105"; + private static final String host2 = "192.168.1.93"; + private static final Random random = new Random(System.currentTimeMillis()); + + @Test + public void mysql2tdengine() throws Throwable { + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/m2t-1.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + @Before + public void before() throws SQLException { + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + String ts = sdf.format(new Date(System.currentTimeMillis())); + + final String url = "jdbc:mysql://" + host1 + ":3306/?useSSL=false&useUnicode=true&charset=UTF-8&generateSimpleParameterMetadata=true"; + try (Connection conn = DriverManager.getConnection(url, "root", "123456")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists db1"); + stmt.execute("create database if not exists db1"); + stmt.execute("use db1"); + stmt.execute("create table stb1(id int primary key AUTO_INCREMENT, " + + "f1 tinyint, f2 smallint, f3 int, f4 bigint, " + + "f5 float, f6 double, " + + "ts timestamp, dt datetime," + + "f7 nchar(100), f8 varchar(100))"); + for (int i = 1; i <= 10; i++) { + String sql = "insert into stb1(f1, f2, f3, f4, f5, f6, ts, dt, f7, f8) values(" + + i + "," + random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextInt(100) + "," + + random.nextFloat() + "," + random.nextDouble() + ", " + + "'" + ts + "', '" + ts + "', " + + "'中国北京朝阳望京abc', '中国北京朝阳望京adc')"; + stmt.execute(sql); + } + + stmt.close(); + } + + final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041/"; + try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists db2"); + stmt.execute("create database if not exists db2"); + stmt.execute("create table db2.stb2(" + + "ts timestamp, dt timestamp, " + + "f1 tinyint, f2 smallint, f3 int, f4 bigint, " + + "f5 float, f6 double, " + + "f7 nchar(100), f8 nchar(100))"); + + stmt.close(); + } + + } + +} diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Opentsdb2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Opentsdb2TDengineTest.java new file mode 100644 index 00000000..879eec26 --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Opentsdb2TDengineTest.java @@ -0,0 +1,36 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.core.Engine; +import org.junit.Assert; +import org.junit.Test; + +import java.sql.*; + +public class Opentsdb2TDengineTest { + + @Test + public void opentsdb2tdengine() throws SQLException { + // when + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/opentsdb2tdengine.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + try { + Engine.entry(params); + } catch (Throwable e) { + e.printStackTrace(); + } + + // assert + String jdbcUrl = "jdbc:TAOS://192.168.56.105:6030/test?timestampFormat=TIMESTAMP"; + try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + ResultSet rs = stmt.executeQuery("select count(*) from weather_temperature"); + int rows = 0; + while (rs.next()) { + rows = rs.getInt("count(*)"); + } + Assert.assertEquals(5, rows); + stmt.close(); + } + } + +} diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Rdbms2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Rdbms2TDengineTest.java new file mode 100644 index 00000000..6a1170ea --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Rdbms2TDengineTest.java @@ -0,0 +1,4 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +public class Rdbms2TDengineTest { +} diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest.java new file mode 100644 index 00000000..0110be48 --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest.java @@ -0,0 +1,68 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.core.Engine; +import org.junit.Before; +import org.junit.Test; + +import java.sql.*; +import java.util.Random; + +public class TDengine2TDengineTest { + + private static final String host1 = "192.168.56.105"; + private static final String host2 = "192.168.1.93"; + private static final String db1 = "db1"; + private static final String stb1 = "stb1"; + private static final String db2 = "db2"; + private static final String stb2 = "stb2"; + private static Random random = new Random(System.currentTimeMillis()); + + @Test + public void case_01() throws Throwable { + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-1.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + @Test + public void case_02() throws Throwable { + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-2.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + @Before + public void before() throws SQLException { + final String url = "jdbc:TAOS-RS://" + host1 + ":6041"; + try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists " + db1); + stmt.execute("create database if not exists " + db1); + stmt.execute("create table " + db1 + "." + stb1 + " (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint," + + " f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " + + "t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))"); + for (int i = 0; i < 10; i++) { + String sql = "insert into " + db1 + ".t" + (i + 1) + " using " + db1 + "." + stb1 + " tags(now+" + i + "s," + + random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextInt(100) + "," + + random.nextInt(100) + "," + random.nextFloat() + "," + random.nextDouble() + "," + + random.nextBoolean() + ",'abc123ABC','北京朝阳望京') values(now+" + i + "s, " + + random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextInt(100) + "," + + random.nextInt(100) + "," + random.nextFloat() + "," + random.nextDouble() + "," + + random.nextBoolean() + ",'abc123ABC','北京朝阳望京')"; + stmt.execute(sql); + } + } + + final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041"; + try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + stmt.execute("drop database if exists " + db2); + stmt.execute("create database if not exists " + db2); + stmt.execute("create table " + db2 + "." + stb2 + " (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint," + + " f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " + + "t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))"); + stmt.close(); + } + } +} diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest3.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest3.java new file mode 100644 index 00000000..991f7582 --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest3.java @@ -0,0 +1,62 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.core.Engine; +import org.junit.Before; +import org.junit.Test; + +import java.sql.*; +import java.text.SimpleDateFormat; +import java.util.Random; + +public class TDengine2TDengineTest3 { + + private static final String host1 = "192.168.56.105"; + private static final String host2 = "192.168.1.93"; + private static Random random = new Random(System.currentTimeMillis()); + + @Test + public void case_03() throws Throwable { + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-3.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + @Before + public void before() throws SQLException { + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + String ts = sdf.format(new Date(System.currentTimeMillis())); + + final String url = "jdbc:TAOS-RS://" + host1 + ":6041"; + try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists db1"); + stmt.execute("create database if not exists db1"); + stmt.execute("create table db1.stb1 (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, " + + "f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " + + "t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))"); + for (int i = 1; i <= 10; i++) { + String sql = "insert into db1.t" + i + " using db1.stb1 tags('" + ts + "'," + i + ",2,3,4,5.0,6.0,true,'abc123ABC','北京朝阳望京') " + + "values(now+" + i + "s, " + random.nextInt(100) + "," + random.nextInt(100) + "," + + random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextFloat() + "," + + random.nextDouble() + "," + random.nextBoolean() + ",'abc123ABC','北京朝阳望京')"; + stmt.execute(sql); + } + } + + final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041"; + try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + stmt.execute("drop database if exists db2"); + stmt.execute("create database if not exists db2"); + stmt.execute("create table db2.stb2 (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint," + + " f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " + + "t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))"); + + stmt.execute("create table db2.t1 using db2.stb2 tags('" + ts + "',1,2,3,4,5.0,6.0,true,'abc123ABC','北京朝阳望京')"); + stmt.close(); + } + + } +} diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest4.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest4.java new file mode 100644 index 00000000..46ce0fdf --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengine2TDengineTest4.java @@ -0,0 +1,57 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.core.Engine; +import org.junit.Before; +import org.junit.Test; + +import java.sql.*; +import java.text.SimpleDateFormat; +import java.util.Random; + +public class TDengine2TDengineTest4 { + + private static final String host1 = "192.168.56.105"; + private static final String host2 = "192.168.1.93"; + private static Random random = new Random(System.currentTimeMillis()); + + @Test + public void case_04() throws Throwable { + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-4.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + @Before + public void before() throws SQLException { + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + String ts = sdf.format(new Date(System.currentTimeMillis())); + + final String url = "jdbc:TAOS-RS://" + host1 + ":6041"; + try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists db1"); + stmt.execute("create database if not exists db1"); + stmt.execute("create table db1.weather (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, " + + "f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100))"); + for (int i = 1; i <= 10; i++) { + String sql = "insert into db1.weather values(now+" + i + "s, " + random.nextInt(100) + "," + random.nextInt(100) + "," + + random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextFloat() + "," + + random.nextDouble() + "," + random.nextBoolean() + ",'abc123ABC','北京朝阳望京')"; + stmt.execute(sql); + } + } + + final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041"; + try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + stmt.execute("drop database if exists db2"); + stmt.execute("create database if not exists db2"); + stmt.execute("create table db2.weather (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, " + + "f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100))"); + stmt.close(); + } + + } +} diff --git a/tdenginewriter/src/test/resources/mysql2tdengine.json b/tdenginewriter/src/test/resources/m2t-1.json similarity index 63% rename from tdenginewriter/src/test/resources/mysql2tdengine.json rename to tdenginewriter/src/test/resources/m2t-1.json index 38485533..dcacb4b2 100644 --- a/tdenginewriter/src/test/resources/mysql2tdengine.json +++ b/tdenginewriter/src/test/resources/m2t-1.json @@ -8,16 +8,25 @@ "username": "root", "password": "123456", "column": [ - "*" + "ts", + "dt", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8" ], "splitPk": "id", "connection": [ { "table": [ - "t" + "stb1" ], "jdbcUrl": [ - "jdbc:mysql://192.168.56.105:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8" + "jdbc:mysql://192.168.56.105:3306/db1?useSSL=false&useUnicode=true&characterEncoding=utf8" ] } ] @@ -30,16 +39,22 @@ "password": "taosdata", "column": [ "ts", + "dt", "f1", "f2", - "t1" + "f3", + "f4", + "f5", + "f6", + "f7", + "f8" ], "connection": [ { "table": [ - "st" + "stb2" ], - "jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/test?timestampFormat=TIMESTAMP" + "jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2" } ], "batchSize": 1000, diff --git a/tdenginewriter/src/test/resources/t2t-1.json b/tdenginewriter/src/test/resources/t2t-1.json new file mode 100644 index 00000000..5ca04d9a --- /dev/null +++ b/tdenginewriter/src/test/resources/t2t-1.json @@ -0,0 +1,94 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "username": "root", + "password": "taosdata", + "connection": [ + { + "table": [ + "stb1" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1?timestampFormat=TIMESTAMP" + } + ], + "column": [ + "tbname", + "ts", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9", + "t1", + "t2", + "t3", + "t4", + "t5", + "t6", + "t7", + "t8", + "t9", + "t10" + ], + "beginDateTime": "2022-02-15 00:00:00", + "endDateTime": "2022-02-16 00:00:00", + "splitInterval": "1d" + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "tbname", + "ts", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9", + "t1", + "t2", + "t3", + "t4", + "t5", + "t6", + "t7", + "t8", + "t9", + "t10" + ], + "connection": [ + { + "table": [ + "stb2" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2?timestampFormat=TIMESTAMP" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/t2t-2.json b/tdenginewriter/src/test/resources/t2t-2.json new file mode 100644 index 00000000..18130b6e --- /dev/null +++ b/tdenginewriter/src/test/resources/t2t-2.json @@ -0,0 +1,92 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "username": "root", + "password": "taosdata", + "connection": [ + { + "table": [ + "stb1" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1?timestampFormat=TIMESTAMP" + } + ], + "column": [ + "ts", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9", + "t1", + "t2", + "t3", + "t4", + "t5", + "t6", + "t7", + "t8", + "t9", + "t10" + ], + "beginDateTime": "2022-02-15 00:00:00", + "endDateTime": "2022-02-16 00:00:00", + "splitInterval": "1d" + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "ts", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9", + "t1", + "t2", + "t3", + "t4", + "t5", + "t6", + "t7", + "t8", + "t9", + "t10" + ], + "connection": [ + { + "table": [ + "stb2" + ], + "jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/t2t-3.json b/tdenginewriter/src/test/resources/t2t-3.json new file mode 100644 index 00000000..e0a22959 --- /dev/null +++ b/tdenginewriter/src/test/resources/t2t-3.json @@ -0,0 +1,92 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "username": "root", + "password": "taosdata", + "connection": [ + { + "table": [ + "stb1" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1?timestampFormat=TIMESTAMP" + } + ], + "column": [ + "ts", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9", + "t1", + "t2", + "t3", + "t4", + "t5", + "t6", + "t7", + "t8", + "t9", + "t10" + ], + "beginDateTime": "2022-02-15 00:00:00", + "endDateTime": "2022-02-16 00:00:00", + "splitInterval": "1d" + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "ts", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9", + "t1", + "t2", + "t3", + "t4", + "t5", + "t6", + "t7", + "t8", + "t9", + "t10" + ], + "connection": [ + { + "table": [ + "t1" + ], + "jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2?timestampFormat=TIMESTAMP" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/t2t-4.json b/tdenginewriter/src/test/resources/t2t-4.json new file mode 100644 index 00000000..9f1533b1 --- /dev/null +++ b/tdenginewriter/src/test/resources/t2t-4.json @@ -0,0 +1,72 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "tdenginereader", + "parameter": { + "username": "root", + "password": "taosdata", + "connection": [ + { + "table": [ + "weather" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1?timestampFormat=TIMESTAMP" + } + ], + "column": [ + "ts", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9" + ], + "beginDateTime": "2022-02-15 00:00:00", + "endDateTime": "2022-02-16 00:00:00", + "splitInterval": "1d" + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "ts", + "f1", + "f2", + "f3", + "f4", + "f5", + "f6", + "f7", + "f8", + "f9" + ], + "connection": [ + { + "table": [ + "weather" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/tdengine2tdengine.json b/tdenginewriter/src/test/resources/tdengine2tdengine.json deleted file mode 100644 index f0d0092d..00000000 --- a/tdenginewriter/src/test/resources/tdengine2tdengine.json +++ /dev/null @@ -1,51 +0,0 @@ -{ - "job": { - "content": [ - { - "reader": { - "name": "tdenginereader", - "parameter": { - "username": "root", - "password": "taosdata", - "connection": [ - { - "table": [ - "meters" - ], - "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP" - } - ], - "column": [ - "ts", - "current", - "voltage", - "phase", - "groupid", - "location" - ], - "beginDateTime": "2017-07-14 10:40:00", - "endDateTime": "2017-08-14 10:40:00", - "splitInterval": "1d" - } - }, - "writer": { - "name": "tdenginewriter", - "parameter": { - "host": "192.168.56.105", - "port": 6030, - "dbName": "test2", - "username": "root", - "password": "taosdata", - "batchSize": 1000, - "stable": "meters" - } - } - } - ], - "setting": { - "speed": { - "channel": 1 - } - } - } -} \ No newline at end of file