diff --git a/tdenginereader/pom.xml b/tdenginereader/pom.xml
index 319152f8..075a2789 100644
--- a/tdenginereader/pom.xml
+++ b/tdenginereader/pom.xml
@@ -29,12 +29,6 @@
-
- com.alibaba
- fastjson
- 1.2.78
-
-
com.alibaba.datax.tdenginewriter
tdenginewriter
@@ -45,13 +39,7 @@
com.taosdata.jdbc
taos-jdbcdriver
- 2.0.37
-
-
- com.alibaba
- fastjson
-
-
+ 2.0.39
diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml
index 791a4bdc..a7564e6b 100644
--- a/tdenginewriter/pom.xml
+++ b/tdenginewriter/pom.xml
@@ -20,22 +20,10 @@
-
- com.alibaba
- fastjson
- 1.2.78
-
-
com.taosdata.jdbc
taos-jdbcdriver
- 2.0.37
-
-
- com.alibaba
- fastjson
-
-
+ 2.0.39
@@ -74,15 +62,16 @@
5.1.49
test
+
-
-
-
-
-
-
-
-
+
+
+
+
+
+
+
+
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
index 30ba7e23..84adaf72 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
@@ -17,19 +17,20 @@ import java.sql.*;
import java.util.*;
import java.util.Date;
import java.util.stream.Collectors;
+import java.util.stream.IntStream;
public class DefaultDataHandler implements DataHandler {
+ private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
+
static {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
} catch (ClassNotFoundException e) {
- e.printStackTrace();
+ LOG.error(e.getMessage(), e);
}
}
- private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
-
private final TaskPluginCollector taskPluginCollector;
private String username;
private String password;
@@ -47,15 +48,15 @@ public class DefaultDataHandler implements DataHandler {
this.tableMetas = tableMetas;
}
- public void setColumnMetas(Map> columnMetas) {
- this.columnMetas = columnMetas;
+ public void setTbnameColumnMetasMap(Map> tbnameColumnMetasMap) {
+ this.tbnameColumnMetasMap = tbnameColumnMetasMap;
}
public void setSchemaManager(SchemaManager schemaManager) {
this.schemaManager = schemaManager;
}
- private Map> columnMetas;
+ private Map> tbnameColumnMetasMap;
public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) {
this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
@@ -73,14 +74,13 @@ public class DefaultDataHandler implements DataHandler {
int count = 0;
int affectedRows = 0;
-
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
// prepare table_name -> table_meta
this.schemaManager = new SchemaManager(conn);
this.tableMetas = schemaManager.loadTableMeta(tables);
// prepare table_name -> column_meta
- this.columnMetas = schemaManager.loadColumnMetas(tables);
+ this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables);
List recordBatch = new ArrayList<>();
Record record;
@@ -152,10 +152,12 @@ public class DefaultDataHandler implements DataHandler {
TableMeta tableMeta = tableMetas.get(table);
switch (tableMeta.tableType) {
case SUP_TABLE: {
- if (columns.contains("tbname"))
+ if (columns.contains("tbname")) {
affectedRows += writeBatchToSupTableBySQL(conn, table, recordBatch);
- else
- affectedRows += writeBatchToSupTableBySchemaless(conn, table, recordBatch);
+ } else {
+ Map tag2Tbname = schemaManager.loadTagTableNameMap(table);
+ affectedRows += writeBatchToSupTableWithoutTbname(conn, table, recordBatch, tag2Tbname);
+ }
}
break;
case SUB_TABLE:
@@ -169,13 +171,82 @@ public class DefaultDataHandler implements DataHandler {
return affectedRows;
}
+ private int writeBatchToSupTableWithoutTbname(Connection conn, String table, List recordBatch, Map tag2Tbname) throws SQLException {
+ List columnMetas = tbnameColumnMetasMap.get(table);
+ List subTableExist = filterSubTableExistRecords(recordBatch, columnMetas, tag2Tbname);
+ List subTableNotExist = filterSubTableNotExistRecords(recordBatch, columnMetas, tag2Tbname);
+
+ int affectedRows = 0;
+ Map> subTableRecordsMap = splitRecords(subTableExist, columnMetas, tag2Tbname);
+
+ List subTables = new ArrayList<>(subTableRecordsMap.keySet());
+ this.tbnameColumnMetasMap.putAll(schemaManager.loadColumnMetas(subTables));
+
+ for (String subTable : subTableRecordsMap.keySet()) {
+ List subTableRecords = subTableRecordsMap.get(subTable);
+ affectedRows += writeBatchToNormalTable(conn, subTable, subTableRecords);
+ }
+ if (!subTableNotExist.isEmpty())
+ affectedRows += writeBatchToSupTableBySchemaless(conn, table, subTableNotExist);
+ return affectedRows;
+ }
+
+ private List filterSubTableExistRecords(List recordBatch, List columnMetas, Map tag2Tbname) {
+ return recordBatch.stream().filter(record -> {
+ String tagStr = getTagString(columnMetas, record);
+ return tag2Tbname.containsKey(tagStr);
+ }).collect(Collectors.toList());
+ }
+
+ private List filterSubTableNotExistRecords(List recordBatch, List columnMetas, Map tag2Tbname) {
+ return recordBatch.stream().filter(record -> {
+ String tagStr = getTagString(columnMetas, record);
+ return !tag2Tbname.containsKey(tagStr);
+ }).collect(Collectors.toList());
+ }
+
+ private Map> splitRecords(List subTableExist, List columnMetas, Map tag2Tbname) {
+ Map> ret = new HashMap<>();
+ for (Record record : subTableExist) {
+ String tagstr = getTagString(columnMetas, record);
+ String tbname = tag2Tbname.get(tagstr);
+ if (ret.containsKey(tbname)) {
+ ret.get(tbname).add(record);
+ } else {
+ List list = new ArrayList<>();
+ list.add(record);
+ ret.put(tbname, list);
+ }
+ }
+ return ret;
+ }
+
+ private String getTagString(List columnMetas, Record record) {
+ return IntStream.range(0, columnMetas.size()).mapToObj(colIndex -> {
+ ColumnMeta columnMeta = columnMetas.get(colIndex);
+ if (columnMeta.isTag) {
+ Column column = record.getColumn(colIndex);
+ switch (columnMeta.type) {
+ case "TINYINT":
+ case "SMALLINT":
+ case "INT":
+ case "BIGINT":
+ return column.asLong().toString();
+ default:
+ return column.asString();
+ }
+ }
+ return "";
+ }).collect(Collectors.joining());
+ }
+
/**
* insert into record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
*/
private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) throws SQLException {
- List columnMetas = this.columnMetas.get(table);
+ List columnMetas = this.tbnameColumnMetasMap.get(table);
StringBuilder sb = new StringBuilder("insert into");
for (Record record : recordBatch) {
@@ -257,7 +328,7 @@ public class DefaultDataHandler implements DataHandler {
int count = 0;
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
- List columnMetaList = this.columnMetas.get(table);
+ List columnMetaList = this.tbnameColumnMetasMap.get(table);
ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get();
List lines = new ArrayList<>();
@@ -394,7 +465,7 @@ public class DefaultDataHandler implements DataHandler {
* insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
*/
private int writeBatchToSubTable(Connection conn, String table, List recordBatch) throws SQLException {
- List columnMetas = this.columnMetas.get(table);
+ List columnMetas = this.tbnameColumnMetasMap.get(table);
StringBuilder sb = new StringBuilder();
sb.append("insert into ").append(table).append(" ")
@@ -464,18 +535,18 @@ public class DefaultDataHandler implements DataHandler {
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
*/
private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) throws SQLException {
- List columnMetas = this.columnMetas.get(table);
+ List columnMetas = this.tbnameColumnMetasMap.get(table);
StringBuilder sb = new StringBuilder();
sb.append("insert into ").append(table)
.append(" ")
- .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
+ .append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
return colMeta.field;
}).collect(Collectors.joining(",", "(", ")")))
.append(" values ");
for (Record record : recordBatch) {
- sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
+ sb.append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
return buildColumnValue(colMeta, record);
}).collect(Collectors.joining(",", "(", ")")));
}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
index c48b7942..fc0c002d 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
@@ -5,17 +5,18 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.Connection;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
+import java.sql.*;
import java.util.*;
+import java.util.stream.Collectors;
public class SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
+// private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "_";
+ private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "";
private final Connection conn;
private TimestampPrecision precision;
+ private Map> tags2tbnameMaps = new HashMap<>();
public SchemaManager(Connection conn) {
this.conn = conn;
@@ -169,4 +170,37 @@ public class SchemaManager {
return columnMeta;
}
+ public Map loadTagTableNameMap(String table) throws SQLException {
+ if (tags2tbnameMaps.containsKey(table))
+ return tags2tbnameMaps.get(table);
+ Map tags2tbname = new HashMap<>();
+ try (Statement stmt = conn.createStatement()) {
+ // describe table
+ List tags = new ArrayList<>();
+ ResultSet rs = stmt.executeQuery("describe " + table);
+ while (rs.next()) {
+ String note = rs.getString("Note");
+ if ("TAG".equals(note)) {
+ tags.add(rs.getString("Field"));
+ }
+ }
+ // select distinct tbname, t1, t2 from stb
+ rs = stmt.executeQuery("select distinct " + String.join(",", tags) + ",tbname from " + table);
+ while (rs.next()) {
+ ResultSet finalRs = rs;
+ String tagStr = tags.stream().map(t -> {
+ try {
+ return finalRs.getString(t);
+ } catch (SQLException e) {
+ LOG.error(e.getMessage(), e);
+ }
+ return "NULL";
+ }).collect(Collectors.joining(TAG_TABLE_NAME_MAP_KEY_SPLITTER));
+ String tbname = rs.getString("tbname");
+ tags2tbname.put(tagStr, tbname);
+ }
+ }
+ tags2tbnameMaps.put(table, tags2tbname);
+ return tags2tbname;
+ }
}
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java
index 8840aa28..46e601ad 100644
--- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java
@@ -5,7 +5,6 @@ import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
-import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.record.DefaultRecord;
import org.junit.AfterClass;
@@ -60,7 +59,7 @@ public class DefaultDataHandlerTest {
Map tableMetas = schemaManager.loadTableMeta(tables);
Map> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
- handler.setColumnMetas(columnMetas);
+ handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);
@@ -98,7 +97,7 @@ public class DefaultDataHandlerTest {
Map tableMetas = schemaManager.loadTableMeta(tables);
Map> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
- handler.setColumnMetas(columnMetas);
+ handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);
@@ -138,7 +137,7 @@ public class DefaultDataHandlerTest {
Map tableMetas = schemaManager.loadTableMeta(tables);
Map> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
- handler.setColumnMetas(columnMetas);
+ handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(connection, recordList);
@@ -177,7 +176,7 @@ public class DefaultDataHandlerTest {
Map tableMetas = schemaManager.loadTableMeta(tables);
Map> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
- handler.setColumnMetas(columnMetas);
+ handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);
@@ -216,7 +215,7 @@ public class DefaultDataHandlerTest {
Map tableMetas = schemaManager.loadTableMeta(tables);
Map> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
- handler.setColumnMetas(columnMetas);
+ handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);
@@ -255,7 +254,7 @@ public class DefaultDataHandlerTest {
Map tableMetas = schemaManager.loadTableMeta(tables);
Map> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
- handler.setColumnMetas(columnMetas);
+ handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mongo2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mongo2TDengineTest.java
new file mode 100644
index 00000000..2356b6f8
--- /dev/null
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mongo2TDengineTest.java
@@ -0,0 +1,16 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+import com.alibaba.datax.core.Engine;
+import org.junit.Test;
+
+public class Mongo2TDengineTest {
+
+ @Test
+ public void case01() throws Throwable {
+
+ // when
+ String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/mongo2t.json"};
+ System.setProperty("datax.home", "../target/datax/datax");
+ Engine.entry(params);
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java
index 6df0de1d..3708e6f9 100644
--- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java
+++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManagerTest.java
@@ -1,6 +1,5 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
-import com.alibaba.fastjson.util.TypeUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -62,6 +61,23 @@ public class SchemaManagerTest {
Assert.assertEquals(4, stb1.size());
}
+ @Test
+ public void loadTagTableNameMap() throws SQLException {
+ // given
+ SchemaManager schemaManager = new SchemaManager(conn);
+ String table = "stb3";
+
+ // when
+ Map tagTableMap = schemaManager.loadTagTableNameMap(table);
+
+ // then
+ Assert.assertEquals(2, tagTableMap.keySet().size());
+ Assert.assertTrue(tagTableMap.containsKey("11.1abc"));
+ Assert.assertTrue(tagTableMap.containsKey("22.2defg"));
+ Assert.assertEquals("tb5", tagTableMap.get("11.1abc"));
+ Assert.assertEquals("tb6", tagTableMap.get("22.2defg"));
+ }
+
@BeforeClass
public static void beforeClass() throws SQLException {
conn = DriverManager.getConnection("jdbc:TAOS-RS://192.168.56.105:6041", "root", "taosdata");
@@ -76,6 +92,9 @@ public class SchemaManagerTest {
stmt.execute("insert into tb3 using stb2 tags(1,1) values(now, 1, 2, 3)");
stmt.execute("insert into tb4 using stb2 tags(2,2) values(now, 1, 2, 3)");
stmt.execute("create table weather(ts timestamp, f1 int, f2 int, f3 int, t1 int, t2 int)");
+ stmt.execute("create table stb3(ts timestamp, f1 int) tags(t1 int, t2 float, t3 nchar(32))");
+ stmt.execute("insert into tb5 using stb3 tags(1,1.1,'abc') values(now, 1)");
+ stmt.execute("insert into tb6 using stb3 tags(2,2.2,'defg') values(now, 2)");
}
}
diff --git a/tdenginewriter/src/test/resources/incremental_sync/clean_env.sh b/tdenginewriter/src/test/resources/incremental_sync/clean_env.sh
new file mode 100755
index 00000000..f3dca7c1
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/clean_env.sh
@@ -0,0 +1,9 @@
+#!/bin/bash
+
+datax_home_dir=$(dirname $(readlink -f "$0"))
+
+curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'drop table if exists db2.stb2;' 192.168.1.93:6041/rest/sql
+curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create table if not exists db2.stb2 (`ts` TIMESTAMP,`f2` SMALLINT,`f4` BIGINT,`f5` FLOAT,`f6` DOUBLE,`f7` DOUBLE,`f8` BOOL,`f9` NCHAR(100),`f10` NCHAR(200)) TAGS (`f1` TINYINT,`f3` INT);' 192.168.1.93:6041/rest/sql
+
+rm -f ${datax_home_dir}/log/*
+rm -f ${datax_home_dir}/job/*.csv
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/csv2t-jni.json b/tdenginewriter/src/test/resources/incremental_sync/csv2t-jni.json
new file mode 100644
index 00000000..625c3801
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/csv2t-jni.json
@@ -0,0 +1,106 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "txtfilereader",
+ "parameter": {
+ "path": [
+ "/root/workspace/tmp/a.txt"
+ ],
+ "encoding": "UTF-8",
+ "column": [
+ {
+ "index": 0,
+ "type": "date",
+ "format": "yyyy-MM-dd HH:mm:ss.SSS"
+ },
+ {
+ "index": 1,
+ "type": "long"
+ },
+ {
+ "index": 2,
+ "type": "long"
+ },
+ {
+ "index": 3,
+ "type": "long"
+ },
+ {
+ "index": 4,
+ "type": "long"
+ },
+ {
+ "index": 5,
+ "type": "double"
+ },
+ {
+ "index": 6,
+ "type": "double"
+ },
+ {
+ "index": 7,
+ "type": "boolean"
+ },
+ {
+ "index": 8,
+ "type": "string"
+ },
+ {
+ "index": 9,
+ "type": "string"
+ },
+ {
+ "index": 10,
+ "type": "date",
+ "format": "yyyy-MM-dd HH:mm:ss.SSS"
+ },
+ {
+ "index": 11,
+ "type": "string"
+ }
+ ],
+ "fieldDelimiter": ","
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "username": "root",
+ "password": "taosdata",
+ "column": [
+ "ts",
+ "f1",
+ "f2",
+ "f3",
+ "f4",
+ "f5",
+ "f6",
+ "f7",
+ "f8",
+ "f9",
+ "t1",
+ "tbname"
+ ],
+ "connection": [
+ {
+ "table": [
+ "stb2"
+ ],
+ "jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2"
+ }
+ ],
+ "batchSize": 1000,
+ "ignoreTagsUnmatched": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/csv2t-restful.json b/tdenginewriter/src/test/resources/incremental_sync/csv2t-restful.json
new file mode 100644
index 00000000..d852e2e2
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/csv2t-restful.json
@@ -0,0 +1,57 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "txtfilereader",
+ "parameter": {
+ "path": [
+ "/root/workspace/tmp/a.txt"
+ ],
+ "encoding": "UTF-8",
+ "column": [
+ "*"
+ ],
+ "fieldDelimiter": ","
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "username": "root",
+ "password": "taosdata",
+ "column": [
+ "ts",
+ "f1",
+ "f2",
+ "f3",
+ "f4",
+ "f5",
+ "f6",
+ "f7",
+ "f8",
+ "f9",
+ "t1",
+ "tbname"
+ ],
+ "connection": [
+ {
+ "table": [
+ "stb2"
+ ],
+ "jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
+ }
+ ],
+ "batchSize": 1000,
+ "ignoreTagsUnmatched": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/dm2t-jni.json b/tdenginewriter/src/test/resources/incremental_sync/dm2t-jni.json
new file mode 100644
index 00000000..3e86bb8d
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/dm2t-jni.json
@@ -0,0 +1,62 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "rdbmsreader",
+ "parameter": {
+ "username": "TESTUSER",
+ "password": "test123456",
+ "connection": [
+ {
+ "querySql": [
+ "select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1;"
+ ],
+ "jdbcUrl": [
+ "jdbc:dm://192.168.0.72:5236"
+ ]
+ }
+ ],
+ "fetchSize": 1024
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "username": "root",
+ "password": "taosdata",
+ "column": [
+ "tbname",
+ "ts",
+ "f1",
+ "f2",
+ "f3",
+ "f4",
+ "f5",
+ "f6",
+ "f7",
+ "f8",
+ "f9",
+ "f10"
+ ],
+ "connection": [
+ {
+ "table": [
+ "stb2"
+ ],
+ "jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2"
+ }
+ ],
+ "batchSize": 1000,
+ "ignoreTagsUnmatched": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/dm2t-restful.json b/tdenginewriter/src/test/resources/incremental_sync/dm2t-restful.json
new file mode 100644
index 00000000..183786bf
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/dm2t-restful.json
@@ -0,0 +1,62 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "rdbmsreader",
+ "parameter": {
+ "username": "TESTUSER",
+ "password": "test123456",
+ "connection": [
+ {
+ "querySql": [
+ "select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1;"
+ ],
+ "jdbcUrl": [
+ "jdbc:dm://192.168.0.72:5236"
+ ]
+ }
+ ],
+ "fetchSize": 1024
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "username": "root",
+ "password": "taosdata",
+ "column": [
+ "tbname",
+ "ts",
+ "f1",
+ "f2",
+ "f3",
+ "f4",
+ "f5",
+ "f6",
+ "f7",
+ "f8",
+ "f9",
+ "f10"
+ ],
+ "connection": [
+ {
+ "table": [
+ "stb2"
+ ],
+ "jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
+ }
+ ],
+ "batchSize": 1000,
+ "ignoreTagsUnmatched": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/dm2t-update.json b/tdenginewriter/src/test/resources/incremental_sync/dm2t-update.json
new file mode 100644
index 00000000..d9285b23
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/dm2t-update.json
@@ -0,0 +1,63 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "rdbmsreader",
+ "parameter": {
+ "username": "TESTUSER",
+ "password": "test123456",
+ "connection": [
+ {
+ "querySql": [
+ "select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1"
+ ],
+ "jdbcUrl": [
+ "jdbc:dm://192.168.0.72:5236"
+ ]
+ }
+ ],
+ "where": "1=1",
+ "fetchSize": 1024
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "username": "root",
+ "password": "taosdata",
+ "column": [
+ "tbname",
+ "ts",
+ "f1",
+ "f2",
+ "f3",
+ "f4",
+ "f5",
+ "f6",
+ "f7",
+ "f8",
+ "f9",
+ "f10"
+ ],
+ "connection": [
+ {
+ "table": [
+ "stb2"
+ ],
+ "jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
+ }
+ ],
+ "batchSize": 1000,
+ "ignoreTagsUnmatched": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh b/tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh
new file mode 100755
index 00000000..426c6233
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh
@@ -0,0 +1,57 @@
+#!/bin/bash
+
+set -e
+#set -x
+
+datax_home_dir=$(dirname $(readlink -f "$0"))
+table_name="stb1"
+update_key="ts"
+
+while getopts "hd:t:" arg; do
+ case $arg in
+ d)
+ datax_home_dir=$(echo $OPTARG)
+ ;;
+ v)
+ table_name=$(echo $OPTARG)
+ ;;
+ h)
+ echo "Usage: $(basename $0) -d [datax_home_dir] -t [table_name] -k [update_key]"
+ echo " -h help"
+ exit 0
+ ;;
+ ?) #unknow option
+ echo "unkonw argument"
+ exit 1
+ ;;
+ esac
+done
+
+if [[ -e ${datax_home_dir}/job/${table_name}.csv ]]; then
+ MAX_TIME=$(cat ${datax_home_dir}/job/${table_name}.csv)
+else
+ MAX_TIME="null"
+fi
+current_datetime=$(date +"%Y-%m-%d %H:%M:%S")
+current_timestamp=$(date +%s)
+
+if [ "$MAX_TIME" != "null" ]; then
+ WHERE="${update_key} >= '$MAX_TIME' and ${update_key} < '$current_datetime'"
+ sed "s/1=1/$WHERE/g" ${datax_home_dir}/job/dm2t-update.json >${datax_home_dir}/job/dm2t_${current_timestamp}.json
+ echo "incremental data synchronization, from '${MAX_TIME}' to '${current_datetime}'"
+ python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t_${current_timestamp}.json 1> /dev/null 2>&1
+else
+ echo "full data synchronization, to '${current_datetime}'"
+ python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t-update.json 1> /dev/null 2>&1
+fi
+
+if [[ $? -ne 0 ]]; then
+ echo "datax migration job falied"
+else
+ echo ${current_datetime} >$datax_home_dir/job/${table_name}.csv
+ echo "datax migration job success"
+fi
+
+rm -rf ${datax_home_dir}/job/dm2t_${current_timestamp}.json
+
+#while true; do ./dm2t_sync.sh; sleep 5s; done
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/t2dm-jni.json b/tdenginewriter/src/test/resources/incremental_sync/t2dm-jni.json
new file mode 100644
index 00000000..341f6293
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/t2dm-jni.json
@@ -0,0 +1,50 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "tdenginereader",
+ "parameter": {
+ "username": "root",
+ "password": "taosdata",
+ "column": [
+ "*"
+ ],
+ "connection": [
+ {
+ "table": [
+ "stb1"
+ ],
+ "jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/db1"
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "rdbmswriter",
+ "parameter": {
+ "connection": [
+ {
+ "table": [
+ "stb2"
+ ],
+ "jdbcUrl": "jdbc:dm://192.168.0.72:5236"
+ }
+ ],
+ "username": "TESTUSER",
+ "password": "test123456",
+ "table": "stb2",
+ "column": [
+ "*"
+ ]
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/t2dm-restful.json b/tdenginewriter/src/test/resources/incremental_sync/t2dm-restful.json
new file mode 100644
index 00000000..b2cf91e2
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/t2dm-restful.json
@@ -0,0 +1,50 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "tdenginereader",
+ "parameter": {
+ "username": "root",
+ "password": "taosdata",
+ "column": [
+ "*"
+ ],
+ "connection": [
+ {
+ "table": [
+ "stb1"
+ ],
+ "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1"
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "rdbmswriter",
+ "parameter": {
+ "connection": [
+ {
+ "table": [
+ "stb2"
+ ],
+ "jdbcUrl": "jdbc:dm://192.168.0.72:5236"
+ }
+ ],
+ "username": "TESTUSER",
+ "password": "test123456",
+ "table": "stb2",
+ "column": [
+ "*"
+ ]
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/incremental_sync/upload.sh b/tdenginewriter/src/test/resources/incremental_sync/upload.sh
new file mode 100755
index 00000000..388d275b
--- /dev/null
+++ b/tdenginewriter/src/test/resources/incremental_sync/upload.sh
@@ -0,0 +1,13 @@
+#!/bin/bash
+
+scp t2dm-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job
+scp t2dm-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job
+scp dm2t-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job
+scp dm2t-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job
+scp dm2t-update.json root@192.168.56.105:/root/workspace/tmp/datax/job
+scp csv2t-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job
+scp csv2t-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job
+
+
+scp dm2t_sync.sh root@192.168.56.105:/root/workspace/tmp/datax
+scp clean_env.sh root@192.168.56.105:/root/workspace/tmp/datax
\ No newline at end of file
diff --git a/tdenginewriter/src/test/resources/mongo2t.json b/tdenginewriter/src/test/resources/mongo2t.json
new file mode 100644
index 00000000..902e6f7c
--- /dev/null
+++ b/tdenginewriter/src/test/resources/mongo2t.json
@@ -0,0 +1,66 @@
+{
+ "job": {
+ "content": [
+ {
+ "reader": {
+ "name": "mongodbreader",
+ "parameter": {
+ "address": [
+ "192.168.1.213:27017"
+ ],
+ "userName": "",
+ "userPassword": "",
+ "dbName": "testdb",
+ "collectionName": "monitor_data",
+ "column": [
+ {
+ "name": "ct",
+ "type": "date"
+ },
+ {
+ "name": "pv",
+ "type": "float"
+ },
+ {
+ "name": "tv",
+ "type": "float"
+ },
+ {
+ "name": "pid",
+ "type": "float"
+ }
+ ]
+ }
+ },
+ "writer": {
+ "name": "tdenginewriter",
+ "parameter": {
+ "username": "root",
+ "password": "hmdata",
+ "column": [
+ "ts",
+ "pressure",
+ "temperature",
+ "position_id"
+ ],
+ "connection": [
+ {
+ "table": [
+ "pipeline_data"
+ ],
+ "jdbcUrl": "jdbc:TAOS-RS://192.168.1.213:6041/mongo3040"
+ }
+ ],
+ "batchSize": 1000,
+ "ignoreTagsUnmatched": true
+ }
+ }
+ }
+ ],
+ "setting": {
+ "speed": {
+ "channel": 1
+ }
+ }
+ }
+}
\ No newline at end of file