This commit is contained in:
zyyang 2022-02-18 14:13:03 +08:00
parent 67dd21edfe
commit 0680ce2e97
17 changed files with 572 additions and 244 deletions

View File

@ -74,6 +74,15 @@
<version>5.1.49</version> <version>5.1.49</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- 添加 dm8 jdbc jar 包依赖-->
<dependency>
<groupId>com.dameng</groupId>
<artifactId>dm-jdbc</artifactId>
<version>1.8</version>
<scope>system</scope>
<systemPath>/Users/yangzy/IdeaProjects/DataX/tdenginewriter/src/test/resources/DmJdbcDriver18.jar
</systemPath>
</dependency>
</dependencies> </dependencies>

View File

@ -180,9 +180,21 @@ public class DefaultDataHandler implements DataHandler {
private String buildColumnValue(ColumnMeta colMeta, Record record) { private String buildColumnValue(ColumnMeta colMeta, Record record) {
Column column = record.getColumn(indexOf(colMeta.field)); Column column = record.getColumn(indexOf(colMeta.field));
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
switch (column.getType()) { switch (column.getType()) {
case DATE: case DATE: {
return "'" + column.asString() + "'"; Date value = column.asDate();
switch (timestampPrecision) {
case MILLISEC:
return "" + (value.getTime());
case MICROSEC:
return "" + (value.getTime() * 1000);
case NANOSEC:
return "" + (value.getTime() * 1000_000);
default:
return "'" + column.asString() + "'";
}
}
case BYTES: case BYTES:
case STRING: case STRING:
if (colMeta.type.equals("TIMESTAMP")) if (colMeta.type.equals("TIMESTAMP"))

View File

@ -0,0 +1,122 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Date;
import java.util.Random;
public class DM2TDengineTest {
private String host1 = "192.168.0.72";
private String host2 = "192.168.1.93";
private final Random random = new Random(System.currentTimeMillis());
@Test
public void dm2t_case01() throws Throwable {
// given
createSupTable();
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/dm2t-1.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Test
public void dm2t_case02() throws Throwable {
// given
createSupAndSubTable();
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/dm2t-2.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Test
public void dm2t_case03() throws Throwable {
// given
createTable();
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/dm2t-3.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Test
public void dm2t_case04() throws Throwable {
// given
createSupTable();
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/dm2t-4.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
private void createSupTable() throws SQLException {
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.stb2(ts timestamp, f2 smallint, f4 bigint,f5 float, " +
"f6 double, f7 double, f8 bool, f9 nchar(100), f10 nchar(200)) tags(f1 tinyint,f3 int)");
stmt.close();
}
}
private void createSupAndSubTable() throws SQLException {
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.stb2(ts timestamp, f2 smallint, f4 bigint,f5 float, " +
"f6 double, f7 double, f8 bool, f9 nchar(100), f10 nchar(200)) tags(f1 tinyint,f3 int)");
for (int i = 0; i < 10; i++) {
stmt.execute("create table db2.t" + (i + 1) + "_" + i + " using db2.stb2 tags(" + (i + 1) + "," + i + ")");
}
stmt.close();
}
}
private void createTable() throws SQLException {
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.stb2(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint,f5 float, " +
"f6 double, f7 double, f8 bool, f9 nchar(100), f10 nchar(200))");
stmt.close();
}
}
@Before
public void before() throws SQLException, ClassNotFoundException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
long ts = System.currentTimeMillis();
final String url = "jdbc:dm://" + host1 + ":5236";
try (Connection conn = DriverManager.getConnection(url, "TESTUSER", "test123456")) {
conn.setAutoCommit(true);
Statement stmt = conn.createStatement();
stmt.execute("drop table if exists stb1");
stmt.execute("create table stb1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, " +
"f6 double, f7 NUMERIC(10,2), f8 BIT, f9 VARCHAR(100), f10 VARCHAR2(200))");
for (int i = 0; i < 10; i++) {
String sql = "insert into stb1 values('" + sdf.format(new Date(ts + i * 1000)) + "'," + (i + 1) + "," +
random.nextInt(100) + "," + i + ",4,5.55,6.666,7.77," + (random.nextBoolean() ? 1 : 0) +
",'abcABC123','北京朝阳望京DM')";
stmt.execute(sql);
}
}
}
}

View File

@ -22,12 +22,13 @@ import java.util.stream.IntStream;
public class DefaultDataHandlerTest { public class DefaultDataHandlerTest {
private static final String host = "192.168.56.105"; private static final String host = "192.168.1.93";
private static Connection conn; private static Connection conn;
@Test @Test
public void writeSupTableBySQL() { public void writeSupTableBySQL() throws SQLException {
// given // given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" + Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," + "\"username\": \"root\"," +
"\"password\": \"taosdata\"," + "\"password\": \"taosdata\"," +
@ -55,6 +56,7 @@ public class DefaultDataHandlerTest {
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables); Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas); handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas); handler.setColumnMetas(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList); int count = handler.writeBatch(conn, recordList);
@ -63,8 +65,9 @@ public class DefaultDataHandlerTest {
} }
@Test @Test
public void writeSupTableBySQL_2() { public void writeSupTableBySQL_2() throws SQLException {
// given // given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" + Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," + "\"username\": \"root\"," +
"\"password\": \"taosdata\"," + "\"password\": \"taosdata\"," +
@ -91,6 +94,7 @@ public class DefaultDataHandlerTest {
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables); Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas); handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas); handler.setColumnMetas(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList); int count = handler.writeBatch(conn, recordList);
@ -99,8 +103,49 @@ public class DefaultDataHandlerTest {
} }
@Test @Test
public void writeSubTableWithTableName() { public void writeSupTableBySchemaless() throws SQLException {
// given // given
createSupTable();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," +
"\"table\":[\"stb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS://" + host + ":6030/scm_test\"," +
"\"batchSize\": \"1000\"" +
"}");
String jdbcUrl = configuration.getString("jdbcUrl");
Connection connection = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
long current = System.currentTimeMillis();
List<Record> recordList = IntStream.range(1, 11).mapToObj(i -> {
Record record = new DefaultRecord();
record.addColumn(new DateColumn(current + 1000 * i));
record.addColumn(new LongColumn(1));
record.addColumn(new LongColumn(2));
record.addColumn(new StringColumn("t" + i + " 22"));
return record;
}).collect(Collectors.toList());
// when
DefaultDataHandler handler = new DefaultDataHandler(configuration);
List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(connection);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(connection, recordList);
// then
Assert.assertEquals(10, count);
}
@Test
public void writeSubTableWithTableName() throws SQLException {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" + Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," + "\"username\": \"root\"," +
"\"password\": \"taosdata\"," + "\"password\": \"taosdata\"," +
@ -128,6 +173,7 @@ public class DefaultDataHandlerTest {
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables); Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas); handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas); handler.setColumnMetas(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList); int count = handler.writeBatch(conn, recordList);
@ -136,8 +182,9 @@ public class DefaultDataHandlerTest {
} }
@Test @Test
public void writeSubTableWithoutTableName() { public void writeSubTableWithoutTableName() throws SQLException {
// given // given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" + Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," + "\"username\": \"root\"," +
"\"password\": \"taosdata\"," + "\"password\": \"taosdata\"," +
@ -165,6 +212,7 @@ public class DefaultDataHandlerTest {
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables); Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas); handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas); handler.setColumnMetas(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList); int count = handler.writeBatch(conn, recordList);
@ -173,8 +221,9 @@ public class DefaultDataHandlerTest {
} }
@Test @Test
public void writeNormalTable() { public void writeNormalTable() throws SQLException {
// given // given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" + Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," + "\"username\": \"root\"," +
"\"password\": \"taosdata\"," + "\"password\": \"taosdata\"," +
@ -202,6 +251,7 @@ public class DefaultDataHandlerTest {
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables); Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas); handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas); handler.setColumnMetas(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList); int count = handler.writeBatch(conn, recordList);
@ -209,10 +259,8 @@ public class DefaultDataHandlerTest {
Assert.assertEquals(10, count); Assert.assertEquals(10, count);
} }
@BeforeClass private void createSupAndSubTable() throws SQLException {
public static void beforeClass() throws SQLException { try(Statement stmt = conn.createStatement()){
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041", "root", "taosdata");
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists scm_test"); stmt.execute("drop database if exists scm_test");
stmt.execute("create database if not exists scm_test"); stmt.execute("create database if not exists scm_test");
stmt.execute("use scm_test"); stmt.execute("use scm_test");
@ -226,6 +274,20 @@ public class DefaultDataHandlerTest {
} }
} }
private void createSupTable() throws SQLException {
try (Statement stmt = conn.createStatement()){
stmt.execute("drop database if exists scm_test");
stmt.execute("create database if not exists scm_test");
stmt.execute("use scm_test");
stmt.execute("create table stb1(ts timestamp, f1 int, f2 int) tags(t1 nchar(32))");
}
}
@BeforeClass
public static void beforeClass() throws SQLException {
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041", "root", "taosdata");
}
@AfterClass @AfterClass
public static void afterClass() throws SQLException { public static void afterClass() throws SQLException {
if (conn != null) { if (conn != null) {

View File

@ -1,87 +0,0 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.record.DefaultRecord;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class DefaultDataHandlerTest2 {
private static final String host = "192.168.1.93";
private static Connection conn;
@Test
public void writeSupTableBySchemaless() throws SQLException {
// given
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," +
"\"table\":[\"stb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS://" + host + ":6030/scm_test\"," +
"\"batchSize\": \"1000\"" +
"}");
String jdbcUrl = configuration.getString("jdbcUrl");
Connection connection = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
long current = System.currentTimeMillis();
List<Record> recordList = IntStream.range(1, 11).mapToObj(i -> {
Record record = new DefaultRecord();
record.addColumn(new DateColumn(current + 1000 * i));
record.addColumn(new LongColumn(1));
record.addColumn(new LongColumn(2));
record.addColumn(new StringColumn("t" + i + " 22"));
return record;
}).collect(Collectors.toList());
// when
DefaultDataHandler handler = new DefaultDataHandler(configuration);
List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(connection);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(connection, recordList);
// then
Assert.assertEquals(10, count);
}
@BeforeClass
public static void beforeClass() throws SQLException {
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041", "root", "taosdata");
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists scm_test");
stmt.execute("create database if not exists scm_test");
stmt.execute("use scm_test");
stmt.execute("create table stb1(ts timestamp, f1 int, f2 int) tags(t1 nchar(32))");
}
}
@AfterClass
public static void afterClass() throws SQLException {
if (conn != null) {
conn.close();
}
}
}

View File

@ -11,7 +11,7 @@ public class Opentsdb2TDengineTest {
@Test @Test
public void opentsdb2tdengine() throws SQLException { public void opentsdb2tdengine() throws SQLException {
// when // when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/opentsdb2tdengine.json"}; String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/o2t-1.json"};
System.setProperty("datax.home", "../target/datax/datax"); System.setProperty("datax.home", "../target/datax/datax");
try { try {
Engine.entry(params); Engine.entry(params);

View File

@ -1,4 +0,0 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
public class Rdbms2TDengineTest {
}

View File

@ -5,20 +5,21 @@ import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import java.sql.*; import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Random; import java.util.Random;
public class TDengine2TDengineTest { public class TDengine2TDengineTest {
private static final String host1 = "192.168.56.105"; private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.1.93"; private static final String host2 = "192.168.1.93";
private static final String db1 = "db1"; private static final Random random = new Random(System.currentTimeMillis());
private static final String stb1 = "stb1";
private static final String db2 = "db2";
private static final String stb2 = "stb2";
private static Random random = new Random(System.currentTimeMillis());
@Test @Test
public void case_01() throws Throwable { public void case_01() throws Throwable {
// given
createSupTable();
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-1.json"}; String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-1.json"};
System.setProperty("datax.home", "../target/datax/datax"); System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params); Engine.entry(params);
@ -26,24 +27,93 @@ public class TDengine2TDengineTest {
@Test @Test
public void case_02() throws Throwable { public void case_02() throws Throwable {
// given
createSupTable();
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-2.json"}; String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-2.json"};
System.setProperty("datax.home", "../target/datax/datax"); System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params); Engine.entry(params);
} }
@Test
public void case_03() throws Throwable {
// given
createSupAndSubTable();
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-3.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Test
public void case_04() throws Throwable {
// given
createTable();
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-4.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
private void createTable() throws SQLException {
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.weather (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, " +
"f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100))");
stmt.close();
}
}
private void createSupTable() throws SQLException {
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.stb2 (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint," +
" f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " +
"t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))");
stmt.close();
}
}
private void createSupAndSubTable() throws SQLException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
final String ts = sdf.format(new Date(System.currentTimeMillis()));
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.stb2 (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint," +
" f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " +
"t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))");
stmt.execute("create table db2.t1 using db2.stb2 tags('" + ts + "',1,2,3,4,5.0,6.0,true,'abc123ABC','北京朝阳望京')");
stmt.close();
}
}
@Before @Before
public void before() throws SQLException { public void before() throws SQLException {
final String url = "jdbc:TAOS-RS://" + host1 + ":6041"; final String url = "jdbc:TAOS-RS://" + host1 + ":6041";
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) { try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
Statement stmt = conn.createStatement(); Statement stmt = conn.createStatement();
stmt.execute("drop database if exists " + db1); stmt.execute("drop database if exists db1");
stmt.execute("create database if not exists " + db1); stmt.execute("create database if not exists db1");
stmt.execute("create table " + db1 + "." + stb1 + " (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint," + stmt.execute("create table db1.stb1 (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint," +
" f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " + " f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " +
"t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))"); "t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))");
for (int i = 0; i < 10; i++) { for (int i = 0; i < 10; i++) {
String sql = "insert into " + db1 + ".t" + (i + 1) + " using " + db1 + "." + stb1 + " tags(now+" + i + "s," + String sql = "insert into db1.t" + (i + 1) + " using db1.stb1 tags(now+" + i + "s," +
random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextInt(100) + "," +
random.nextInt(100) + "," + random.nextFloat() + "," + random.nextDouble() + "," + random.nextInt(100) + "," + random.nextFloat() + "," + random.nextDouble() + "," +
random.nextBoolean() + ",'abc123ABC','北京朝阳望京') values(now+" + i + "s, " + random.nextBoolean() + ",'abc123ABC','北京朝阳望京') values(now+" + i + "s, " +
@ -53,16 +123,5 @@ public class TDengine2TDengineTest {
stmt.execute(sql); stmt.execute(sql);
} }
} }
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists " + db2);
stmt.execute("create database if not exists " + db2);
stmt.execute("create table " + db2 + "." + stb2 + " (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint," +
" f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " +
"t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))");
stmt.close();
}
} }
} }

View File

@ -1,62 +0,0 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Random;
public class TDengine2TDengineTest3 {
private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.1.93";
private static Random random = new Random(System.currentTimeMillis());
@Test
public void case_03() throws Throwable {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-3.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Before
public void before() throws SQLException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String ts = sdf.format(new Date(System.currentTimeMillis()));
final String url = "jdbc:TAOS-RS://" + host1 + ":6041";
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db1");
stmt.execute("create database if not exists db1");
stmt.execute("create table db1.stb1 (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, " +
"f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " +
"t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))");
for (int i = 1; i <= 10; i++) {
String sql = "insert into db1.t" + i + " using db1.stb1 tags('" + ts + "'," + i + ",2,3,4,5.0,6.0,true,'abc123ABC','北京朝阳望京') " +
"values(now+" + i + "s, " + random.nextInt(100) + "," + random.nextInt(100) + "," +
random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextFloat() + "," +
random.nextDouble() + "," + random.nextBoolean() + ",'abc123ABC','北京朝阳望京')";
stmt.execute(sql);
}
}
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.stb2 (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint," +
" f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " +
"t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))");
stmt.execute("create table db2.t1 using db2.stb2 tags('" + ts + "',1,2,3,4,5.0,6.0,true,'abc123ABC','北京朝阳望京')");
stmt.close();
}
}
}

View File

@ -1,57 +0,0 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Random;
public class TDengine2TDengineTest4 {
private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.1.93";
private static Random random = new Random(System.currentTimeMillis());
@Test
public void case_04() throws Throwable {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-4.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Before
public void before() throws SQLException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String ts = sdf.format(new Date(System.currentTimeMillis()));
final String url = "jdbc:TAOS-RS://" + host1 + ":6041";
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db1");
stmt.execute("create database if not exists db1");
stmt.execute("create table db1.weather (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, " +
"f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100))");
for (int i = 1; i <= 10; i++) {
String sql = "insert into db1.weather values(now+" + i + "s, " + random.nextInt(100) + "," + random.nextInt(100) + "," +
random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextFloat() + "," +
random.nextDouble() + "," + random.nextBoolean() + ",'abc123ABC','北京朝阳望京')";
stmt.execute(sql);
}
}
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.weather (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, " +
"f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100))");
stmt.close();
}
}
}

View File

@ -0,0 +1,13 @@
select tablespace_name from dba_data_files;
create tablespace test datafile '/home/dmdba/dmdbms/data/DAMENG/test.dbf' size 32 autoextend on next 1 maxsize 1024;
create user TESTUSER identified by test123456 default tablespace test;
grant dba to TESTUSER;
select * from user_tables;
drop table if exists stb1;
create table stb1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, f6 double, f7 NUMERIC(10,2), f8 BIT, f9 VARCHAR(100), f10 VARCHAR2(200));

View File

@ -0,0 +1,62 @@
{
"job": {
"content": [
{
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "TESTUSER",
"password": "test123456",
"connection": [
{
"querySql": [
"select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1;"
],
"jdbcUrl": [
"jdbc:dm://192.168.0.72:5236"
]
}
],
"fetchSize": 1024
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"f10"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,62 @@
{
"job": {
"content": [
{
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "TESTUSER",
"password": "test123456",
"connection": [
{
"querySql": [
"select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1;"
],
"jdbcUrl": [
"jdbc:dm://192.168.0.72:5236"
]
}
],
"fetchSize": 1024,
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"f10"
],
"connection": [
{
"table": [
"t1_0"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,76 @@
{
"job": {
"content": [
{
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "TESTUSER",
"password": "test123456",
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"f10"
],
"splitPk": "f1",
"connection": [
{
"table": [
"stb1"
],
"jdbcUrl": [
"jdbc:dm://192.168.0.72:5236"
]
}
],
"fetchSize": 1024,
"where": "1 = 1"
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"f10"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,61 @@
{
"job": {
"content": [
{
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "TESTUSER",
"password": "test123456",
"connection": [
{
"querySql": [
"select * from stb1"
],
"jdbcUrl": [
"jdbc:dm://192.168.0.72:5236"
]
}
],
"fetchSize": 1024
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"f10"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -10,7 +10,7 @@
"connection": [ "connection": [
{ {
"table": [ "table": [
"weather" "stb1"
], ],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1?timestampFormat=TIMESTAMP" "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1?timestampFormat=TIMESTAMP"
} }