mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 23:52:00 +08:00
Merge pull request #1571 from dingxiaobo/mergeUpstream
tdengine updates from @zyyang-taosdata
This commit is contained in:
commit
f4fe220c06
@ -29,11 +29,6 @@
|
|||||||
</exclusions>
|
</exclusions>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.alibaba</groupId>
|
|
||||||
<artifactId>fastjson</artifactId>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.alibaba.datax.tdenginewriter</groupId>
|
<groupId>com.alibaba.datax.tdenginewriter</groupId>
|
||||||
<artifactId>tdenginewriter</artifactId>
|
<artifactId>tdenginewriter</artifactId>
|
||||||
@ -44,13 +39,7 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.taosdata.jdbc</groupId>
|
<groupId>com.taosdata.jdbc</groupId>
|
||||||
<artifactId>taos-jdbcdriver</artifactId>
|
<artifactId>taos-jdbcdriver</artifactId>
|
||||||
<version>2.0.37</version>
|
<version>2.0.39</version>
|
||||||
<exclusions>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>com.alibaba</groupId>
|
|
||||||
<artifactId>fastjson</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
</exclusions>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
|
@ -93,7 +93,7 @@ public class TDengineReader extends Reader {
|
|||||||
}
|
}
|
||||||
if (start >= end)
|
if (start >= end)
|
||||||
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE,
|
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE,
|
||||||
"The parameter " + Key.BEGIN_DATETIME + ": " + beginDatetime + " should be less than the parameter " + Key.END_DATETIME + ": " + endDatetime + ".");
|
"The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "].");
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -119,6 +119,7 @@ public class TDengineReader extends Reader {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
LOG.info("Configuration: {}", configurations);
|
||||||
return configurations;
|
return configurations;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -175,8 +175,8 @@ datax中的数据类型,可以映射到TDengine的数据类型
|
|||||||
| TDengine到TDengine | [超级表到子表](../src/test/resources/t2t-3.json) |
|
| TDengine到TDengine | [超级表到子表](../src/test/resources/t2t-3.json) |
|
||||||
| TDengine到TDengine | [普通表到普通表](../src/test/resources/t2t-4.json) |
|
| TDengine到TDengine | [普通表到普通表](../src/test/resources/t2t-4.json) |
|
||||||
| RDBMS到TDengine | [普通表到超级表,指定tbname](../src/test/resources/dm2t-1.json) |
|
| RDBMS到TDengine | [普通表到超级表,指定tbname](../src/test/resources/dm2t-1.json) |
|
||||||
| RDBMS到TDengine | [普通表到超级表,不指定tbname](../src/test/resources/dm2t-2.json) |
|
| RDBMS到TDengine | [普通表到超级表,不指定tbname](../src/test/resources/dm2t-3.json) |
|
||||||
| RDBMS到TDengine | [普通表到子表](../src/test/resources/dm2t-3.json) |
|
| RDBMS到TDengine | [普通表到子表](../src/test/resources/dm2t-2.json) |
|
||||||
| RDBMS到TDengine | [普通表到普通表](../src/test/resources/dm2t-4.json) |
|
| RDBMS到TDengine | [普通表到普通表](../src/test/resources/dm2t-4.json) |
|
||||||
| OpenTSDB到TDengine | [metric到普通表](../src/test/resources/o2t-1.json) |
|
| OpenTSDB到TDengine | [metric到普通表](../src/test/resources/o2t-1.json) |
|
||||||
|
|
||||||
|
@ -20,21 +20,10 @@
|
|||||||
|
|
||||||
<dependencies>
|
<dependencies>
|
||||||
|
|
||||||
<dependency>
|
|
||||||
<groupId>com.alibaba</groupId>
|
|
||||||
<artifactId>fastjson</artifactId>
|
|
||||||
</dependency>
|
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.taosdata.jdbc</groupId>
|
<groupId>com.taosdata.jdbc</groupId>
|
||||||
<artifactId>taos-jdbcdriver</artifactId>
|
<artifactId>taos-jdbcdriver</artifactId>
|
||||||
<version>2.0.37</version>
|
<version>2.0.39</version>
|
||||||
<exclusions>
|
|
||||||
<exclusion>
|
|
||||||
<groupId>com.alibaba</groupId>
|
|
||||||
<artifactId>fastjson</artifactId>
|
|
||||||
</exclusion>
|
|
||||||
</exclusions>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<dependency>
|
<dependency>
|
||||||
@ -73,6 +62,7 @@
|
|||||||
<version>5.1.49</version>
|
<version>5.1.49</version>
|
||||||
<scope>test</scope>
|
<scope>test</scope>
|
||||||
</dependency>
|
</dependency>
|
||||||
|
|
||||||
<!-- 添加 dm8 jdbc jar 包依赖-->
|
<!-- 添加 dm8 jdbc jar 包依赖-->
|
||||||
<!-- <dependency>-->
|
<!-- <dependency>-->
|
||||||
<!-- <groupId>com.dameng</groupId>-->
|
<!-- <groupId>com.dameng</groupId>-->
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||||
|
|
||||||
public class Constants {
|
public class Constants {
|
||||||
public static final int DEFAULT_BATCH_SIZE = 1000;
|
public static final String DEFAULT_USERNAME = "root";
|
||||||
|
public static final String DEFAULT_PASSWORD = "taosdata";
|
||||||
|
public static final int DEFAULT_BATCH_SIZE = 1;
|
||||||
public static final boolean DEFAULT_IGNORE_TAGS_UNMATCHED = false;
|
public static final boolean DEFAULT_IGNORE_TAGS_UNMATCHED = false;
|
||||||
}
|
}
|
@ -17,10 +17,20 @@ import java.sql.*;
|
|||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
public class DefaultDataHandler implements DataHandler {
|
public class DefaultDataHandler implements DataHandler {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
|
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
|
||||||
|
|
||||||
|
static {
|
||||||
|
try {
|
||||||
|
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
||||||
|
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
|
||||||
|
} catch (ClassNotFoundException e) {
|
||||||
|
LOG.error(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private final TaskPluginCollector taskPluginCollector;
|
private final TaskPluginCollector taskPluginCollector;
|
||||||
private String username;
|
private String username;
|
||||||
private String password;
|
private String password;
|
||||||
@ -38,27 +48,19 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
this.tableMetas = tableMetas;
|
this.tableMetas = tableMetas;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setColumnMetas(Map<String, List<ColumnMeta>> columnMetas) {
|
public void setTbnameColumnMetasMap(Map<String, List<ColumnMeta>> tbnameColumnMetasMap) {
|
||||||
this.columnMetas = columnMetas;
|
this.tbnameColumnMetasMap = tbnameColumnMetasMap;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setSchemaManager(SchemaManager schemaManager) {
|
public void setSchemaManager(SchemaManager schemaManager) {
|
||||||
this.schemaManager = schemaManager;
|
this.schemaManager = schemaManager;
|
||||||
}
|
}
|
||||||
|
|
||||||
private Map<String, List<ColumnMeta>> columnMetas;
|
private Map<String, List<ColumnMeta>> tbnameColumnMetasMap;
|
||||||
|
|
||||||
static {
|
|
||||||
try {
|
|
||||||
Class.forName("com.taosdata.jdbc.TSDBDriver");
|
|
||||||
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
|
|
||||||
} catch (ClassNotFoundException ignored) {
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) {
|
public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) {
|
||||||
this.username = configuration.getString(Key.USERNAME);
|
this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
|
||||||
this.password = configuration.getString(Key.PASSWORD);
|
this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD);
|
||||||
this.jdbcUrl = configuration.getString(Key.JDBC_URL);
|
this.jdbcUrl = configuration.getString(Key.JDBC_URL);
|
||||||
this.batchSize = configuration.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE);
|
this.batchSize = configuration.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE);
|
||||||
this.tables = configuration.getList(Key.TABLE, String.class);
|
this.tables = configuration.getList(Key.TABLE, String.class);
|
||||||
@ -74,13 +76,11 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
|
|
||||||
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
|
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
|
||||||
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
|
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
|
||||||
if (schemaManager == null) {
|
// prepare table_name -> table_meta
|
||||||
// prepare table_name -> table_meta
|
this.schemaManager = new SchemaManager(conn);
|
||||||
this.schemaManager = new SchemaManager(conn);
|
this.tableMetas = schemaManager.loadTableMeta(tables);
|
||||||
this.tableMetas = schemaManager.loadTableMeta(tables);
|
// prepare table_name -> column_meta
|
||||||
// prepare table_name -> column_meta
|
this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables);
|
||||||
this.columnMetas = schemaManager.loadColumnMetas(tables);
|
|
||||||
}
|
|
||||||
|
|
||||||
List<Record> recordBatch = new ArrayList<>();
|
List<Record> recordBatch = new ArrayList<>();
|
||||||
Record record;
|
Record record;
|
||||||
@ -91,7 +91,7 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
try {
|
try {
|
||||||
recordBatch.add(record);
|
recordBatch.add(record);
|
||||||
affectedRows += writeBatch(conn, recordBatch);
|
affectedRows += writeBatch(conn, recordBatch);
|
||||||
} catch (Exception e) {
|
} catch (SQLException e) {
|
||||||
LOG.warn("use one row insert. because:" + e.getMessage());
|
LOG.warn("use one row insert. because:" + e.getMessage());
|
||||||
affectedRows += writeEachRow(conn, recordBatch);
|
affectedRows += writeEachRow(conn, recordBatch);
|
||||||
}
|
}
|
||||||
@ -103,7 +103,7 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
if (!recordBatch.isEmpty()) {
|
if (!recordBatch.isEmpty()) {
|
||||||
try {
|
try {
|
||||||
affectedRows += writeBatch(conn, recordBatch);
|
affectedRows += writeBatch(conn, recordBatch);
|
||||||
} catch (Exception e) {
|
} catch (SQLException e) {
|
||||||
LOG.warn("use one row insert. because:" + e.getMessage());
|
LOG.warn("use one row insert. because:" + e.getMessage());
|
||||||
affectedRows += writeEachRow(conn, recordBatch);
|
affectedRows += writeEachRow(conn, recordBatch);
|
||||||
}
|
}
|
||||||
@ -127,8 +127,8 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
recordList.add(record);
|
recordList.add(record);
|
||||||
try {
|
try {
|
||||||
affectedRows += writeBatch(conn, recordList);
|
affectedRows += writeBatch(conn, recordList);
|
||||||
} catch (Exception e) {
|
} catch (SQLException e) {
|
||||||
LOG.error(e.getMessage(), e);
|
LOG.error(e.getMessage());
|
||||||
this.taskPluginCollector.collectDirtyRecord(record, e);
|
this.taskPluginCollector.collectDirtyRecord(record, e);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -146,16 +146,18 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
* 3. 对于tb,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2)
|
* 3. 对于tb,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2)
|
||||||
* 4. 对于t,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2)
|
* 4. 对于t,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2)
|
||||||
*/
|
*/
|
||||||
public int writeBatch(Connection conn, List<Record> recordBatch) throws Exception {
|
public int writeBatch(Connection conn, List<Record> recordBatch) throws SQLException {
|
||||||
int affectedRows = 0;
|
int affectedRows = 0;
|
||||||
for (String table : tables) {
|
for (String table : tables) {
|
||||||
TableMeta tableMeta = tableMetas.get(table);
|
TableMeta tableMeta = tableMetas.get(table);
|
||||||
switch (tableMeta.tableType) {
|
switch (tableMeta.tableType) {
|
||||||
case SUP_TABLE: {
|
case SUP_TABLE: {
|
||||||
if (columns.contains("tbname"))
|
if (columns.contains("tbname")) {
|
||||||
affectedRows += writeBatchToSupTableBySQL(conn, table, recordBatch);
|
affectedRows += writeBatchToSupTableBySQL(conn, table, recordBatch);
|
||||||
else
|
} else {
|
||||||
affectedRows += writeBatchToSupTableBySchemaless(conn, table, recordBatch);
|
Map<String, String> tag2Tbname = schemaManager.loadTagTableNameMap(table);
|
||||||
|
affectedRows += writeBatchToSupTableWithoutTbname(conn, table, recordBatch, tag2Tbname);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case SUB_TABLE:
|
case SUB_TABLE:
|
||||||
@ -169,67 +171,105 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
return affectedRows;
|
return affectedRows;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private int writeBatchToSupTableWithoutTbname(Connection conn, String table, List<Record> recordBatch, Map<String, String> tag2Tbname) throws SQLException {
|
||||||
|
List<ColumnMeta> columnMetas = tbnameColumnMetasMap.get(table);
|
||||||
|
List<Record> subTableExist = filterSubTableExistRecords(recordBatch, columnMetas, tag2Tbname);
|
||||||
|
List<Record> subTableNotExist = filterSubTableNotExistRecords(recordBatch, columnMetas, tag2Tbname);
|
||||||
|
|
||||||
|
int affectedRows = 0;
|
||||||
|
Map<String, List<Record>> subTableRecordsMap = splitRecords(subTableExist, columnMetas, tag2Tbname);
|
||||||
|
|
||||||
|
List<String> subTables = new ArrayList<>(subTableRecordsMap.keySet());
|
||||||
|
this.tbnameColumnMetasMap.putAll(schemaManager.loadColumnMetas(subTables));
|
||||||
|
|
||||||
|
for (String subTable : subTableRecordsMap.keySet()) {
|
||||||
|
List<Record> subTableRecords = subTableRecordsMap.get(subTable);
|
||||||
|
affectedRows += writeBatchToNormalTable(conn, subTable, subTableRecords);
|
||||||
|
}
|
||||||
|
if (!subTableNotExist.isEmpty())
|
||||||
|
affectedRows += writeBatchToSupTableBySchemaless(conn, table, subTableNotExist);
|
||||||
|
return affectedRows;
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Record> filterSubTableExistRecords(List<Record> recordBatch, List<ColumnMeta> columnMetas, Map<String, String> tag2Tbname) {
|
||||||
|
return recordBatch.stream().filter(record -> {
|
||||||
|
String tagStr = getTagString(columnMetas, record);
|
||||||
|
return tag2Tbname.containsKey(tagStr);
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private List<Record> filterSubTableNotExistRecords(List<Record> recordBatch, List<ColumnMeta> columnMetas, Map<String, String> tag2Tbname) {
|
||||||
|
return recordBatch.stream().filter(record -> {
|
||||||
|
String tagStr = getTagString(columnMetas, record);
|
||||||
|
return !tag2Tbname.containsKey(tagStr);
|
||||||
|
}).collect(Collectors.toList());
|
||||||
|
}
|
||||||
|
|
||||||
|
private Map<String, List<Record>> splitRecords(List<Record> subTableExist, List<ColumnMeta> columnMetas, Map<String, String> tag2Tbname) {
|
||||||
|
Map<String, List<Record>> ret = new HashMap<>();
|
||||||
|
for (Record record : subTableExist) {
|
||||||
|
String tagstr = getTagString(columnMetas, record);
|
||||||
|
String tbname = tag2Tbname.get(tagstr);
|
||||||
|
if (ret.containsKey(tbname)) {
|
||||||
|
ret.get(tbname).add(record);
|
||||||
|
} else {
|
||||||
|
List<Record> list = new ArrayList<>();
|
||||||
|
list.add(record);
|
||||||
|
ret.put(tbname, list);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getTagString(List<ColumnMeta> columnMetas, Record record) {
|
||||||
|
return IntStream.range(0, columnMetas.size()).mapToObj(colIndex -> {
|
||||||
|
ColumnMeta columnMeta = columnMetas.get(colIndex);
|
||||||
|
if (columnMeta.isTag) {
|
||||||
|
Column column = record.getColumn(colIndex);
|
||||||
|
switch (columnMeta.type) {
|
||||||
|
case "TINYINT":
|
||||||
|
case "SMALLINT":
|
||||||
|
case "INT":
|
||||||
|
case "BIGINT":
|
||||||
|
return column.asLong().toString();
|
||||||
|
default:
|
||||||
|
return column.asString();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return "";
|
||||||
|
}).collect(Collectors.joining());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* insert into record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
|
* insert into record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
|
||||||
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
|
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
|
||||||
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
|
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
|
||||||
*/
|
*/
|
||||||
private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) throws Exception {
|
private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) throws SQLException {
|
||||||
List<ColumnMeta> columnMetas = this.columnMetas.get(table);
|
List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
|
||||||
|
|
||||||
StringBuilder sb = new StringBuilder("insert into");
|
StringBuilder sb = new StringBuilder("insert into");
|
||||||
for (Record record : recordBatch) {
|
for (Record record : recordBatch) {
|
||||||
sb.append(" ").append(record.getColumn(indexOf("tbname")).asString())
|
sb.append(" ").append(record.getColumn(indexOf("tbname")).asString())
|
||||||
.append(" using ").append(table)
|
.append(" using ").append(table)
|
||||||
.append(" tags");
|
.append(" tags")
|
||||||
// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
|
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
|
||||||
// return colMeta.isTag;
|
return colMeta.isTag;
|
||||||
// }).map(colMeta -> {
|
}).map(colMeta -> {
|
||||||
// return buildColumnValue(colMeta, record);
|
return buildColumnValue(colMeta, record);
|
||||||
// }).collect(Collectors.joining(",", "(", ")")));
|
}).collect(Collectors.joining(",", "(", ")")))
|
||||||
sb.append("(");
|
.append(" ")
|
||||||
for (int i = 0; i < columns.size(); i++) {
|
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
|
||||||
ColumnMeta colMeta = columnMetas.get(i);
|
|
||||||
if (!columns.contains(colMeta.field))
|
|
||||||
continue;
|
|
||||||
if (!colMeta.isTag)
|
|
||||||
continue;
|
|
||||||
String tagValue = buildColumnValue(colMeta, record);
|
|
||||||
if (i == 0) {
|
|
||||||
sb.append(tagValue);
|
|
||||||
} else {
|
|
||||||
sb.append(",").append(tagValue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sb.append(")");
|
|
||||||
|
|
||||||
sb.append(" ").append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
|
|
||||||
return !colMeta.isTag;
|
return !colMeta.isTag;
|
||||||
}).map(colMeta -> {
|
}).map(colMeta -> {
|
||||||
return colMeta.field;
|
return colMeta.field;
|
||||||
}).collect(Collectors.joining(",", "(", ")")))
|
}).collect(Collectors.joining(",", "(", ")")))
|
||||||
.append(" values");
|
.append(" values")
|
||||||
|
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
|
||||||
// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
|
return !colMeta.isTag;
|
||||||
// return !colMeta.isTag;
|
}).map(colMeta -> {
|
||||||
// }).map(colMeta -> {
|
return buildColumnValue(colMeta, record);
|
||||||
// return buildColumnValue(colMeta, record);
|
}).collect(Collectors.joining(",", "(", ")")));
|
||||||
// }).collect(Collectors.joining(",", "(", ")")));
|
|
||||||
sb.append("(");
|
|
||||||
for (int i = 0; i < columnMetas.size(); i++) {
|
|
||||||
ColumnMeta colMeta = columnMetas.get(i);
|
|
||||||
if (!columns.contains(colMeta.field))
|
|
||||||
continue;
|
|
||||||
if (colMeta.isTag)
|
|
||||||
continue;
|
|
||||||
String colValue = buildColumnValue(colMeta, record);
|
|
||||||
if (i == 0) {
|
|
||||||
sb.append(colValue);
|
|
||||||
} else {
|
|
||||||
sb.append(",").append(colValue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sb.append(")");
|
|
||||||
}
|
}
|
||||||
String sql = sb.toString();
|
String sql = sb.toString();
|
||||||
|
|
||||||
@ -245,11 +285,10 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String buildColumnValue(ColumnMeta colMeta, Record record) throws Exception {
|
private String buildColumnValue(ColumnMeta colMeta, Record record) {
|
||||||
Column column = record.getColumn(indexOf(colMeta.field));
|
Column column = record.getColumn(indexOf(colMeta.field));
|
||||||
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
|
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
|
||||||
Column.Type type = column.getType();
|
switch (column.getType()) {
|
||||||
switch (type) {
|
|
||||||
case DATE: {
|
case DATE: {
|
||||||
Date value = column.asDate();
|
Date value = column.asDate();
|
||||||
switch (timestampPrecision) {
|
switch (timestampPrecision) {
|
||||||
@ -268,6 +307,8 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
if (colMeta.type.equals("TIMESTAMP"))
|
if (colMeta.type.equals("TIMESTAMP"))
|
||||||
return "\"" + column.asString() + "\"";
|
return "\"" + column.asString() + "\"";
|
||||||
String value = column.asString();
|
String value = column.asString();
|
||||||
|
if (value == null)
|
||||||
|
return "NULL";
|
||||||
return "\'" + Utils.escapeSingleQuota(value) + "\'";
|
return "\'" + Utils.escapeSingleQuota(value) + "\'";
|
||||||
case NULL:
|
case NULL:
|
||||||
case BAD:
|
case BAD:
|
||||||
@ -276,9 +317,8 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
case DOUBLE:
|
case DOUBLE:
|
||||||
case INT:
|
case INT:
|
||||||
case LONG:
|
case LONG:
|
||||||
return column.asString();
|
|
||||||
default:
|
default:
|
||||||
throw new Exception("invalid column type: " + type);
|
return column.asString();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -290,7 +330,7 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
int count = 0;
|
int count = 0;
|
||||||
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
|
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
|
||||||
|
|
||||||
List<ColumnMeta> columnMetaList = this.columnMetas.get(table);
|
List<ColumnMeta> columnMetaList = this.tbnameColumnMetasMap.get(table);
|
||||||
ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get();
|
ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get();
|
||||||
|
|
||||||
List<String> lines = new ArrayList<>();
|
List<String> lines = new ArrayList<>();
|
||||||
@ -426,8 +466,8 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
* else
|
* else
|
||||||
* insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
|
* insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
|
||||||
*/
|
*/
|
||||||
private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) throws Exception {
|
private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
|
||||||
List<ColumnMeta> columnMetas = this.columnMetas.get(table);
|
List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
|
||||||
|
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
sb.append("insert into ").append(table).append(" ")
|
sb.append("insert into ").append(table).append(" ")
|
||||||
@ -453,25 +493,11 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
if (ignoreTagsUnmatched && !tagsAllMatch)
|
if (ignoreTagsUnmatched && !tagsAllMatch)
|
||||||
continue;
|
continue;
|
||||||
|
|
||||||
// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
|
sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
|
||||||
// return !colMeta.isTag;
|
return !colMeta.isTag;
|
||||||
// }).map(colMeta -> {
|
}).map(colMeta -> {
|
||||||
// return buildColumnValue(colMeta, record);
|
return buildColumnValue(colMeta, record);
|
||||||
// }).collect(Collectors.joining(", ", "(", ") ")));
|
}).collect(Collectors.joining(", ", "(", ") ")));
|
||||||
sb.append("(");
|
|
||||||
for (int i = 0; i < columnMetas.size(); i++) {
|
|
||||||
ColumnMeta colMeta = columnMetas.get(i);
|
|
||||||
if (colMeta.isTag)
|
|
||||||
continue;
|
|
||||||
String colValue = buildColumnValue(colMeta, record);
|
|
||||||
if (i == 0) {
|
|
||||||
sb.append(colValue);
|
|
||||||
} else {
|
|
||||||
sb.append(",").append(colValue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sb.append(")");
|
|
||||||
|
|
||||||
validRecords++;
|
validRecords++;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -510,34 +536,21 @@ public class DefaultDataHandler implements DataHandler {
|
|||||||
* table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"]
|
* table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"]
|
||||||
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
|
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
|
||||||
*/
|
*/
|
||||||
private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) throws Exception {
|
private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
|
||||||
List<ColumnMeta> columnMetas = this.columnMetas.get(table);
|
List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
|
||||||
|
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
sb.append("insert into ").append(table)
|
sb.append("insert into ").append(table)
|
||||||
.append(" ")
|
.append(" ")
|
||||||
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
|
.append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
|
||||||
return colMeta.field;
|
return colMeta.field;
|
||||||
}).collect(Collectors.joining(",", "(", ")")))
|
}).collect(Collectors.joining(",", "(", ")")))
|
||||||
.append(" values ");
|
.append(" values ");
|
||||||
|
|
||||||
for (Record record : recordBatch) {
|
for (Record record : recordBatch) {
|
||||||
// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
|
sb.append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
|
||||||
// return buildColumnValue(colMeta, record);
|
return buildColumnValue(colMeta, record);
|
||||||
// }).collect(Collectors.joining(",", "(", ")")));
|
}).collect(Collectors.joining(",", "(", ")")));
|
||||||
sb.append("(");
|
|
||||||
for (int i = 0; i < columnMetas.size(); i++) {
|
|
||||||
ColumnMeta colMeta = columnMetas.get(i);
|
|
||||||
if (!columns.contains(colMeta.field))
|
|
||||||
continue;
|
|
||||||
String colValue = buildColumnValue(colMeta, record);
|
|
||||||
if (i == 0) {
|
|
||||||
sb.append(colValue);
|
|
||||||
} else {
|
|
||||||
sb.append(",").append(colValue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
sb.append(")");
|
|
||||||
}
|
}
|
||||||
|
|
||||||
String sql = sb.toString();
|
String sql = sb.toString();
|
||||||
|
@ -5,18 +5,18 @@ import org.apache.commons.lang3.StringUtils;
|
|||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.*;
|
||||||
import java.sql.ResultSet;
|
|
||||||
import java.sql.SQLException;
|
|
||||||
import java.sql.Statement;
|
|
||||||
import java.util.*;
|
import java.util.*;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class SchemaManager {
|
public class SchemaManager {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
|
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
|
||||||
|
// private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "_";
|
||||||
|
private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "";
|
||||||
|
|
||||||
private final Connection conn;
|
private final Connection conn;
|
||||||
private TimestampPrecision precision;
|
private TimestampPrecision precision;
|
||||||
|
private Map<String, Map<String, String>> tags2tbnameMaps = new HashMap<>();
|
||||||
|
|
||||||
public SchemaManager(Connection conn) {
|
public SchemaManager(Connection conn) {
|
||||||
this.conn = conn;
|
this.conn = conn;
|
||||||
@ -124,14 +124,12 @@ public class SchemaManager {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (SQLException e) {
|
} catch (SQLException e) {
|
||||||
LOG.error(e.getMessage(), e);
|
e.printStackTrace();
|
||||||
}
|
}
|
||||||
colMeta.value = value;
|
colMeta.value = value;
|
||||||
});
|
});
|
||||||
|
|
||||||
LOG.debug("load column metadata of " + table + ": " +
|
LOG.debug("load column metadata of " + table + ": " + Arrays.toString(columnMetaList.toArray()));
|
||||||
columnMetaList.stream().map(ColumnMeta::toString).collect(Collectors.joining(",", "[", "]"))
|
|
||||||
);
|
|
||||||
ret.put(table, columnMetaList);
|
ret.put(table, columnMetaList);
|
||||||
}
|
}
|
||||||
return ret;
|
return ret;
|
||||||
@ -145,9 +143,7 @@ public class SchemaManager {
|
|||||||
tableMeta.tags = rs.getInt("tags");
|
tableMeta.tags = rs.getInt("tags");
|
||||||
tableMeta.tables = rs.getInt("tables");
|
tableMeta.tables = rs.getInt("tables");
|
||||||
|
|
||||||
if (LOG.isDebugEnabled()){
|
LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
|
||||||
LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
|
|
||||||
}
|
|
||||||
return tableMeta;
|
return tableMeta;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -174,4 +170,37 @@ public class SchemaManager {
|
|||||||
return columnMeta;
|
return columnMeta;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public Map<String, String> loadTagTableNameMap(String table) throws SQLException {
|
||||||
|
if (tags2tbnameMaps.containsKey(table))
|
||||||
|
return tags2tbnameMaps.get(table);
|
||||||
|
Map<String, String> tags2tbname = new HashMap<>();
|
||||||
|
try (Statement stmt = conn.createStatement()) {
|
||||||
|
// describe table
|
||||||
|
List<String> tags = new ArrayList<>();
|
||||||
|
ResultSet rs = stmt.executeQuery("describe " + table);
|
||||||
|
while (rs.next()) {
|
||||||
|
String note = rs.getString("Note");
|
||||||
|
if ("TAG".equals(note)) {
|
||||||
|
tags.add(rs.getString("Field"));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// select distinct tbname, t1, t2 from stb
|
||||||
|
rs = stmt.executeQuery("select distinct " + String.join(",", tags) + ",tbname from " + table);
|
||||||
|
while (rs.next()) {
|
||||||
|
ResultSet finalRs = rs;
|
||||||
|
String tagStr = tags.stream().map(t -> {
|
||||||
|
try {
|
||||||
|
return finalRs.getString(t);
|
||||||
|
} catch (SQLException e) {
|
||||||
|
LOG.error(e.getMessage(), e);
|
||||||
|
}
|
||||||
|
return "NULL";
|
||||||
|
}).collect(Collectors.joining(TAG_TABLE_NAME_MAP_KEY_SPLITTER));
|
||||||
|
String tbname = rs.getString("tbname");
|
||||||
|
tags2tbname.put(tagStr, tbname);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
tags2tbnameMaps.put(table, tags2tbname);
|
||||||
|
return tags2tbname;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -4,10 +4,10 @@ import com.alibaba.datax.common.spi.ErrorCode;
|
|||||||
|
|
||||||
public enum TDengineWriterErrorCode implements ErrorCode {
|
public enum TDengineWriterErrorCode implements ErrorCode {
|
||||||
|
|
||||||
REQUIRED_VALUE("TDengineWriter-00", "parameter value is missing"),
|
REQUIRED_VALUE("TDengineWriter-00", "缺失必要的值"),
|
||||||
ILLEGAL_VALUE("TDengineWriter-01", "invalid parameter value"),
|
ILLEGAL_VALUE("TDengineWriter-01", "值非法"),
|
||||||
RUNTIME_EXCEPTION("TDengineWriter-02", "runtime exception"),
|
RUNTIME_EXCEPTION("TDengineWriter-02", "运行时异常"),
|
||||||
TYPE_ERROR("TDengineWriter-03", "data type mapping error");
|
TYPE_ERROR("TDengineWriter-03", "Datax类型无法正确映射到TDengine类型");
|
||||||
|
|
||||||
private final String code;
|
private final String code;
|
||||||
private final String description;
|
private final String description;
|
||||||
|
@ -2,7 +2,6 @@ package com.alibaba.datax.plugin.writer.tdenginewriter;
|
|||||||
|
|
||||||
import com.alibaba.datax.core.Engine;
|
import com.alibaba.datax.core.Engine;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.sql.*;
|
import java.sql.*;
|
||||||
@ -10,7 +9,6 @@ import java.text.SimpleDateFormat;
|
|||||||
import java.util.Date;
|
import java.util.Date;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
@Ignore
|
|
||||||
public class DM2TDengineTest {
|
public class DM2TDengineTest {
|
||||||
|
|
||||||
private String host1 = "192.168.0.72";
|
private String host1 = "192.168.0.72";
|
||||||
|
@ -5,10 +5,12 @@ import com.alibaba.datax.common.element.LongColumn;
|
|||||||
import com.alibaba.datax.common.element.Record;
|
import com.alibaba.datax.common.element.Record;
|
||||||
import com.alibaba.datax.common.element.StringColumn;
|
import com.alibaba.datax.common.element.StringColumn;
|
||||||
import com.alibaba.datax.common.plugin.TaskPluginCollector;
|
import com.alibaba.datax.common.plugin.TaskPluginCollector;
|
||||||
import com.alibaba.datax.common.spi.Writer;
|
|
||||||
import com.alibaba.datax.common.util.Configuration;
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
import com.alibaba.datax.core.transport.record.DefaultRecord;
|
import com.alibaba.datax.core.transport.record.DefaultRecord;
|
||||||
import org.junit.*;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.DriverManager;
|
import java.sql.DriverManager;
|
||||||
@ -19,7 +21,6 @@ import java.util.Map;
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
import java.util.stream.IntStream;
|
import java.util.stream.IntStream;
|
||||||
|
|
||||||
@Ignore
|
|
||||||
public class DefaultDataHandlerTest {
|
public class DefaultDataHandlerTest {
|
||||||
|
|
||||||
private static final String host = "192.168.1.93";
|
private static final String host = "192.168.1.93";
|
||||||
@ -28,7 +29,7 @@ public class DefaultDataHandlerTest {
|
|||||||
private final TaskPluginCollector taskPluginCollector = new TDengineWriter.Task().getTaskPluginCollector();
|
private final TaskPluginCollector taskPluginCollector = new TDengineWriter.Task().getTaskPluginCollector();
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void writeSupTableBySQL() throws Exception {
|
public void writeSupTableBySQL() throws SQLException {
|
||||||
// given
|
// given
|
||||||
createSupAndSubTable();
|
createSupAndSubTable();
|
||||||
Configuration configuration = Configuration.from("{" +
|
Configuration configuration = Configuration.from("{" +
|
||||||
@ -58,7 +59,7 @@ public class DefaultDataHandlerTest {
|
|||||||
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
||||||
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
|
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
|
||||||
handler.setTableMetas(tableMetas);
|
handler.setTableMetas(tableMetas);
|
||||||
handler.setColumnMetas(columnMetas);
|
handler.setTbnameColumnMetasMap(columnMetas);
|
||||||
handler.setSchemaManager(schemaManager);
|
handler.setSchemaManager(schemaManager);
|
||||||
|
|
||||||
int count = handler.writeBatch(conn, recordList);
|
int count = handler.writeBatch(conn, recordList);
|
||||||
@ -68,7 +69,7 @@ public class DefaultDataHandlerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void writeSupTableBySQL_2() throws Exception {
|
public void writeSupTableBySQL_2() throws SQLException {
|
||||||
// given
|
// given
|
||||||
createSupAndSubTable();
|
createSupAndSubTable();
|
||||||
Configuration configuration = Configuration.from("{" +
|
Configuration configuration = Configuration.from("{" +
|
||||||
@ -96,7 +97,7 @@ public class DefaultDataHandlerTest {
|
|||||||
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
||||||
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
|
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
|
||||||
handler.setTableMetas(tableMetas);
|
handler.setTableMetas(tableMetas);
|
||||||
handler.setColumnMetas(columnMetas);
|
handler.setTbnameColumnMetasMap(columnMetas);
|
||||||
handler.setSchemaManager(schemaManager);
|
handler.setSchemaManager(schemaManager);
|
||||||
|
|
||||||
int count = handler.writeBatch(conn, recordList);
|
int count = handler.writeBatch(conn, recordList);
|
||||||
@ -106,7 +107,7 @@ public class DefaultDataHandlerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void writeSupTableBySchemaless() throws Exception {
|
public void writeSupTableBySchemaless() throws SQLException {
|
||||||
// given
|
// given
|
||||||
createSupTable();
|
createSupTable();
|
||||||
Configuration configuration = Configuration.from("{" +
|
Configuration configuration = Configuration.from("{" +
|
||||||
@ -136,7 +137,7 @@ public class DefaultDataHandlerTest {
|
|||||||
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
||||||
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
|
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
|
||||||
handler.setTableMetas(tableMetas);
|
handler.setTableMetas(tableMetas);
|
||||||
handler.setColumnMetas(columnMetas);
|
handler.setTbnameColumnMetasMap(columnMetas);
|
||||||
handler.setSchemaManager(schemaManager);
|
handler.setSchemaManager(schemaManager);
|
||||||
|
|
||||||
int count = handler.writeBatch(connection, recordList);
|
int count = handler.writeBatch(connection, recordList);
|
||||||
@ -146,7 +147,7 @@ public class DefaultDataHandlerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void writeSubTableWithTableName() throws Exception {
|
public void writeSubTableWithTableName() throws SQLException {
|
||||||
// given
|
// given
|
||||||
createSupAndSubTable();
|
createSupAndSubTable();
|
||||||
Configuration configuration = Configuration.from("{" +
|
Configuration configuration = Configuration.from("{" +
|
||||||
@ -175,7 +176,7 @@ public class DefaultDataHandlerTest {
|
|||||||
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
||||||
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
|
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
|
||||||
handler.setTableMetas(tableMetas);
|
handler.setTableMetas(tableMetas);
|
||||||
handler.setColumnMetas(columnMetas);
|
handler.setTbnameColumnMetasMap(columnMetas);
|
||||||
handler.setSchemaManager(schemaManager);
|
handler.setSchemaManager(schemaManager);
|
||||||
|
|
||||||
int count = handler.writeBatch(conn, recordList);
|
int count = handler.writeBatch(conn, recordList);
|
||||||
@ -185,7 +186,7 @@ public class DefaultDataHandlerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void writeSubTableWithoutTableName() throws Exception {
|
public void writeSubTableWithoutTableName() throws SQLException {
|
||||||
// given
|
// given
|
||||||
createSupAndSubTable();
|
createSupAndSubTable();
|
||||||
Configuration configuration = Configuration.from("{" +
|
Configuration configuration = Configuration.from("{" +
|
||||||
@ -214,7 +215,7 @@ public class DefaultDataHandlerTest {
|
|||||||
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
||||||
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
|
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
|
||||||
handler.setTableMetas(tableMetas);
|
handler.setTableMetas(tableMetas);
|
||||||
handler.setColumnMetas(columnMetas);
|
handler.setTbnameColumnMetasMap(columnMetas);
|
||||||
handler.setSchemaManager(schemaManager);
|
handler.setSchemaManager(schemaManager);
|
||||||
|
|
||||||
int count = handler.writeBatch(conn, recordList);
|
int count = handler.writeBatch(conn, recordList);
|
||||||
@ -224,7 +225,7 @@ public class DefaultDataHandlerTest {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void writeNormalTable() throws Exception {
|
public void writeNormalTable() throws SQLException {
|
||||||
// given
|
// given
|
||||||
createSupAndSubTable();
|
createSupAndSubTable();
|
||||||
Configuration configuration = Configuration.from("{" +
|
Configuration configuration = Configuration.from("{" +
|
||||||
@ -253,7 +254,7 @@ public class DefaultDataHandlerTest {
|
|||||||
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
||||||
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
|
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
|
||||||
handler.setTableMetas(tableMetas);
|
handler.setTableMetas(tableMetas);
|
||||||
handler.setColumnMetas(columnMetas);
|
handler.setTbnameColumnMetasMap(columnMetas);
|
||||||
handler.setSchemaManager(schemaManager);
|
handler.setSchemaManager(schemaManager);
|
||||||
|
|
||||||
int count = handler.writeBatch(conn, recordList);
|
int count = handler.writeBatch(conn, recordList);
|
||||||
|
@ -0,0 +1,16 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||||
|
|
||||||
|
import com.alibaba.datax.core.Engine;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
public class Mongo2TDengineTest {
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void case01() throws Throwable {
|
||||||
|
|
||||||
|
// when
|
||||||
|
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/mongo2t.json"};
|
||||||
|
System.setProperty("datax.home", "../target/datax/datax");
|
||||||
|
Engine.entry(params);
|
||||||
|
}
|
||||||
|
}
|
@ -2,14 +2,12 @@ package com.alibaba.datax.plugin.writer.tdenginewriter;
|
|||||||
|
|
||||||
import com.alibaba.datax.core.Engine;
|
import com.alibaba.datax.core.Engine;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.sql.*;
|
import java.sql.*;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
@Ignore
|
|
||||||
public class Mysql2TDengineTest {
|
public class Mysql2TDengineTest {
|
||||||
|
|
||||||
private static final String host1 = "192.168.56.105";
|
private static final String host1 = "192.168.56.105";
|
||||||
|
@ -2,12 +2,10 @@ package com.alibaba.datax.plugin.writer.tdenginewriter;
|
|||||||
|
|
||||||
import com.alibaba.datax.core.Engine;
|
import com.alibaba.datax.core.Engine;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.sql.*;
|
import java.sql.*;
|
||||||
|
|
||||||
@Ignore
|
|
||||||
public class Opentsdb2TDengineTest {
|
public class Opentsdb2TDengineTest {
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -1,6 +1,9 @@
|
|||||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||||
|
|
||||||
import org.junit.*;
|
import org.junit.AfterClass;
|
||||||
|
import org.junit.Assert;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.DriverManager;
|
import java.sql.DriverManager;
|
||||||
@ -10,7 +13,6 @@ import java.util.Arrays;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
@Ignore
|
|
||||||
public class SchemaManagerTest {
|
public class SchemaManagerTest {
|
||||||
|
|
||||||
private static Connection conn;
|
private static Connection conn;
|
||||||
@ -59,6 +61,23 @@ public class SchemaManagerTest {
|
|||||||
Assert.assertEquals(4, stb1.size());
|
Assert.assertEquals(4, stb1.size());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void loadTagTableNameMap() throws SQLException {
|
||||||
|
// given
|
||||||
|
SchemaManager schemaManager = new SchemaManager(conn);
|
||||||
|
String table = "stb3";
|
||||||
|
|
||||||
|
// when
|
||||||
|
Map<String, String> tagTableMap = schemaManager.loadTagTableNameMap(table);
|
||||||
|
|
||||||
|
// then
|
||||||
|
Assert.assertEquals(2, tagTableMap.keySet().size());
|
||||||
|
Assert.assertTrue(tagTableMap.containsKey("11.1abc"));
|
||||||
|
Assert.assertTrue(tagTableMap.containsKey("22.2defg"));
|
||||||
|
Assert.assertEquals("tb5", tagTableMap.get("11.1abc"));
|
||||||
|
Assert.assertEquals("tb6", tagTableMap.get("22.2defg"));
|
||||||
|
}
|
||||||
|
|
||||||
@BeforeClass
|
@BeforeClass
|
||||||
public static void beforeClass() throws SQLException {
|
public static void beforeClass() throws SQLException {
|
||||||
conn = DriverManager.getConnection("jdbc:TAOS-RS://192.168.56.105:6041", "root", "taosdata");
|
conn = DriverManager.getConnection("jdbc:TAOS-RS://192.168.56.105:6041", "root", "taosdata");
|
||||||
@ -73,6 +92,9 @@ public class SchemaManagerTest {
|
|||||||
stmt.execute("insert into tb3 using stb2 tags(1,1) values(now, 1, 2, 3)");
|
stmt.execute("insert into tb3 using stb2 tags(1,1) values(now, 1, 2, 3)");
|
||||||
stmt.execute("insert into tb4 using stb2 tags(2,2) values(now, 1, 2, 3)");
|
stmt.execute("insert into tb4 using stb2 tags(2,2) values(now, 1, 2, 3)");
|
||||||
stmt.execute("create table weather(ts timestamp, f1 int, f2 int, f3 int, t1 int, t2 int)");
|
stmt.execute("create table weather(ts timestamp, f1 int, f2 int, f3 int, t1 int, t2 int)");
|
||||||
|
stmt.execute("create table stb3(ts timestamp, f1 int) tags(t1 int, t2 float, t3 nchar(32))");
|
||||||
|
stmt.execute("insert into tb5 using stb3 tags(1,1.1,'abc') values(now, 1)");
|
||||||
|
stmt.execute("insert into tb6 using stb3 tags(2,2.2,'defg') values(now, 2)");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -1,7 +1,7 @@
|
|||||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||||
|
|
||||||
import com.alibaba.datax.core.Engine;
|
import com.alibaba.datax.core.Engine;
|
||||||
import org.junit.Ignore;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
@ -9,7 +9,6 @@ import java.sql.DriverManager;
|
|||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.sql.Statement;
|
import java.sql.Statement;
|
||||||
|
|
||||||
@Ignore
|
|
||||||
public class Stream2TDengineTest {
|
public class Stream2TDengineTest {
|
||||||
|
|
||||||
private String host2 = "192.168.56.105";
|
private String host2 = "192.168.56.105";
|
||||||
|
@ -2,14 +2,12 @@ package com.alibaba.datax.plugin.writer.tdenginewriter;
|
|||||||
|
|
||||||
import com.alibaba.datax.core.Engine;
|
import com.alibaba.datax.core.Engine;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Ignore;
|
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
import java.sql.*;
|
import java.sql.*;
|
||||||
import java.text.SimpleDateFormat;
|
import java.text.SimpleDateFormat;
|
||||||
import java.util.Random;
|
import java.util.Random;
|
||||||
|
|
||||||
@Ignore
|
|
||||||
public class TDengine2TDengineTest {
|
public class TDengine2TDengineTest {
|
||||||
|
|
||||||
private static final String host1 = "192.168.56.105";
|
private static final String host1 = "192.168.56.105";
|
||||||
|
9
tdenginewriter/src/test/resources/incremental_sync/clean_env.sh
Executable file
9
tdenginewriter/src/test/resources/incremental_sync/clean_env.sh
Executable file
@ -0,0 +1,9 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
datax_home_dir=$(dirname $(readlink -f "$0"))
|
||||||
|
|
||||||
|
curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'drop table if exists db2.stb2;' 192.168.1.93:6041/rest/sql
|
||||||
|
curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create table if not exists db2.stb2 (`ts` TIMESTAMP,`f2` SMALLINT,`f4` BIGINT,`f5` FLOAT,`f6` DOUBLE,`f7` DOUBLE,`f8` BOOL,`f9` NCHAR(100),`f10` NCHAR(200)) TAGS (`f1` TINYINT,`f3` INT);' 192.168.1.93:6041/rest/sql
|
||||||
|
|
||||||
|
rm -f ${datax_home_dir}/log/*
|
||||||
|
rm -f ${datax_home_dir}/job/*.csv
|
@ -0,0 +1,106 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "txtfilereader",
|
||||||
|
"parameter": {
|
||||||
|
"path": [
|
||||||
|
"/root/workspace/tmp/a.txt"
|
||||||
|
],
|
||||||
|
"encoding": "UTF-8",
|
||||||
|
"column": [
|
||||||
|
{
|
||||||
|
"index": 0,
|
||||||
|
"type": "date",
|
||||||
|
"format": "yyyy-MM-dd HH:mm:ss.SSS"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"index": 1,
|
||||||
|
"type": "long"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"index": 2,
|
||||||
|
"type": "long"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"index": 3,
|
||||||
|
"type": "long"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"index": 4,
|
||||||
|
"type": "long"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"index": 5,
|
||||||
|
"type": "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"index": 6,
|
||||||
|
"type": "double"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"index": 7,
|
||||||
|
"type": "boolean"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"index": 8,
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"index": 9,
|
||||||
|
"type": "string"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"index": 10,
|
||||||
|
"type": "date",
|
||||||
|
"format": "yyyy-MM-dd HH:mm:ss.SSS"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"index": 11,
|
||||||
|
"type": "string"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"fieldDelimiter": ","
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "tdenginewriter",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"column": [
|
||||||
|
"ts",
|
||||||
|
"f1",
|
||||||
|
"f2",
|
||||||
|
"f3",
|
||||||
|
"f4",
|
||||||
|
"f5",
|
||||||
|
"f6",
|
||||||
|
"f7",
|
||||||
|
"f8",
|
||||||
|
"f9",
|
||||||
|
"t1",
|
||||||
|
"tbname"
|
||||||
|
],
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"stb2"
|
||||||
|
],
|
||||||
|
"jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"batchSize": 1000,
|
||||||
|
"ignoreTagsUnmatched": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,57 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "txtfilereader",
|
||||||
|
"parameter": {
|
||||||
|
"path": [
|
||||||
|
"/root/workspace/tmp/a.txt"
|
||||||
|
],
|
||||||
|
"encoding": "UTF-8",
|
||||||
|
"column": [
|
||||||
|
"*"
|
||||||
|
],
|
||||||
|
"fieldDelimiter": ","
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "tdenginewriter",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"column": [
|
||||||
|
"ts",
|
||||||
|
"f1",
|
||||||
|
"f2",
|
||||||
|
"f3",
|
||||||
|
"f4",
|
||||||
|
"f5",
|
||||||
|
"f6",
|
||||||
|
"f7",
|
||||||
|
"f8",
|
||||||
|
"f9",
|
||||||
|
"t1",
|
||||||
|
"tbname"
|
||||||
|
],
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"stb2"
|
||||||
|
],
|
||||||
|
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"batchSize": 1000,
|
||||||
|
"ignoreTagsUnmatched": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,62 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "rdbmsreader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "TESTUSER",
|
||||||
|
"password": "test123456",
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"querySql": [
|
||||||
|
"select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1;"
|
||||||
|
],
|
||||||
|
"jdbcUrl": [
|
||||||
|
"jdbc:dm://192.168.0.72:5236"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"fetchSize": 1024
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "tdenginewriter",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"column": [
|
||||||
|
"tbname",
|
||||||
|
"ts",
|
||||||
|
"f1",
|
||||||
|
"f2",
|
||||||
|
"f3",
|
||||||
|
"f4",
|
||||||
|
"f5",
|
||||||
|
"f6",
|
||||||
|
"f7",
|
||||||
|
"f8",
|
||||||
|
"f9",
|
||||||
|
"f10"
|
||||||
|
],
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"stb2"
|
||||||
|
],
|
||||||
|
"jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"batchSize": 1000,
|
||||||
|
"ignoreTagsUnmatched": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,62 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "rdbmsreader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "TESTUSER",
|
||||||
|
"password": "test123456",
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"querySql": [
|
||||||
|
"select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1;"
|
||||||
|
],
|
||||||
|
"jdbcUrl": [
|
||||||
|
"jdbc:dm://192.168.0.72:5236"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"fetchSize": 1024
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "tdenginewriter",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"column": [
|
||||||
|
"tbname",
|
||||||
|
"ts",
|
||||||
|
"f1",
|
||||||
|
"f2",
|
||||||
|
"f3",
|
||||||
|
"f4",
|
||||||
|
"f5",
|
||||||
|
"f6",
|
||||||
|
"f7",
|
||||||
|
"f8",
|
||||||
|
"f9",
|
||||||
|
"f10"
|
||||||
|
],
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"stb2"
|
||||||
|
],
|
||||||
|
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"batchSize": 1000,
|
||||||
|
"ignoreTagsUnmatched": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,63 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "rdbmsreader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "TESTUSER",
|
||||||
|
"password": "test123456",
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"querySql": [
|
||||||
|
"select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1"
|
||||||
|
],
|
||||||
|
"jdbcUrl": [
|
||||||
|
"jdbc:dm://192.168.0.72:5236"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"where": "1=1",
|
||||||
|
"fetchSize": 1024
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "tdenginewriter",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"column": [
|
||||||
|
"tbname",
|
||||||
|
"ts",
|
||||||
|
"f1",
|
||||||
|
"f2",
|
||||||
|
"f3",
|
||||||
|
"f4",
|
||||||
|
"f5",
|
||||||
|
"f6",
|
||||||
|
"f7",
|
||||||
|
"f8",
|
||||||
|
"f9",
|
||||||
|
"f10"
|
||||||
|
],
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"stb2"
|
||||||
|
],
|
||||||
|
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"batchSize": 1000,
|
||||||
|
"ignoreTagsUnmatched": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
57
tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh
Executable file
57
tdenginewriter/src/test/resources/incremental_sync/dm2t_sync.sh
Executable file
@ -0,0 +1,57 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
set -e
|
||||||
|
#set -x
|
||||||
|
|
||||||
|
datax_home_dir=$(dirname $(readlink -f "$0"))
|
||||||
|
table_name="stb1"
|
||||||
|
update_key="ts"
|
||||||
|
|
||||||
|
while getopts "hd:t:" arg; do
|
||||||
|
case $arg in
|
||||||
|
d)
|
||||||
|
datax_home_dir=$(echo $OPTARG)
|
||||||
|
;;
|
||||||
|
v)
|
||||||
|
table_name=$(echo $OPTARG)
|
||||||
|
;;
|
||||||
|
h)
|
||||||
|
echo "Usage: $(basename $0) -d [datax_home_dir] -t [table_name] -k [update_key]"
|
||||||
|
echo " -h help"
|
||||||
|
exit 0
|
||||||
|
;;
|
||||||
|
?) #unknow option
|
||||||
|
echo "unkonw argument"
|
||||||
|
exit 1
|
||||||
|
;;
|
||||||
|
esac
|
||||||
|
done
|
||||||
|
|
||||||
|
if [[ -e ${datax_home_dir}/job/${table_name}.csv ]]; then
|
||||||
|
MAX_TIME=$(cat ${datax_home_dir}/job/${table_name}.csv)
|
||||||
|
else
|
||||||
|
MAX_TIME="null"
|
||||||
|
fi
|
||||||
|
current_datetime=$(date +"%Y-%m-%d %H:%M:%S")
|
||||||
|
current_timestamp=$(date +%s)
|
||||||
|
|
||||||
|
if [ "$MAX_TIME" != "null" ]; then
|
||||||
|
WHERE="${update_key} >= '$MAX_TIME' and ${update_key} < '$current_datetime'"
|
||||||
|
sed "s/1=1/$WHERE/g" ${datax_home_dir}/job/dm2t-update.json >${datax_home_dir}/job/dm2t_${current_timestamp}.json
|
||||||
|
echo "incremental data synchronization, from '${MAX_TIME}' to '${current_datetime}'"
|
||||||
|
python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t_${current_timestamp}.json 1> /dev/null 2>&1
|
||||||
|
else
|
||||||
|
echo "full data synchronization, to '${current_datetime}'"
|
||||||
|
python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t-update.json 1> /dev/null 2>&1
|
||||||
|
fi
|
||||||
|
|
||||||
|
if [[ $? -ne 0 ]]; then
|
||||||
|
echo "datax migration job falied"
|
||||||
|
else
|
||||||
|
echo ${current_datetime} >$datax_home_dir/job/${table_name}.csv
|
||||||
|
echo "datax migration job success"
|
||||||
|
fi
|
||||||
|
|
||||||
|
rm -rf ${datax_home_dir}/job/dm2t_${current_timestamp}.json
|
||||||
|
|
||||||
|
#while true; do ./dm2t_sync.sh; sleep 5s; done
|
@ -0,0 +1,50 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "tdenginereader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"column": [
|
||||||
|
"*"
|
||||||
|
],
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"stb1"
|
||||||
|
],
|
||||||
|
"jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/db1"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "rdbmswriter",
|
||||||
|
"parameter": {
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"stb2"
|
||||||
|
],
|
||||||
|
"jdbcUrl": "jdbc:dm://192.168.0.72:5236"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"username": "TESTUSER",
|
||||||
|
"password": "test123456",
|
||||||
|
"table": "stb2",
|
||||||
|
"column": [
|
||||||
|
"*"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,50 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "tdenginereader",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "taosdata",
|
||||||
|
"column": [
|
||||||
|
"*"
|
||||||
|
],
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"stb1"
|
||||||
|
],
|
||||||
|
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "rdbmswriter",
|
||||||
|
"parameter": {
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"stb2"
|
||||||
|
],
|
||||||
|
"jdbcUrl": "jdbc:dm://192.168.0.72:5236"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"username": "TESTUSER",
|
||||||
|
"password": "test123456",
|
||||||
|
"table": "stb2",
|
||||||
|
"column": [
|
||||||
|
"*"
|
||||||
|
]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
13
tdenginewriter/src/test/resources/incremental_sync/upload.sh
Executable file
13
tdenginewriter/src/test/resources/incremental_sync/upload.sh
Executable file
@ -0,0 +1,13 @@
|
|||||||
|
#!/bin/bash
|
||||||
|
|
||||||
|
scp t2dm-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job
|
||||||
|
scp t2dm-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job
|
||||||
|
scp dm2t-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job
|
||||||
|
scp dm2t-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job
|
||||||
|
scp dm2t-update.json root@192.168.56.105:/root/workspace/tmp/datax/job
|
||||||
|
scp csv2t-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job
|
||||||
|
scp csv2t-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job
|
||||||
|
|
||||||
|
|
||||||
|
scp dm2t_sync.sh root@192.168.56.105:/root/workspace/tmp/datax
|
||||||
|
scp clean_env.sh root@192.168.56.105:/root/workspace/tmp/datax
|
66
tdenginewriter/src/test/resources/mongo2t.json
Normal file
66
tdenginewriter/src/test/resources/mongo2t.json
Normal file
@ -0,0 +1,66 @@
|
|||||||
|
{
|
||||||
|
"job": {
|
||||||
|
"content": [
|
||||||
|
{
|
||||||
|
"reader": {
|
||||||
|
"name": "mongodbreader",
|
||||||
|
"parameter": {
|
||||||
|
"address": [
|
||||||
|
"192.168.1.213:27017"
|
||||||
|
],
|
||||||
|
"userName": "",
|
||||||
|
"userPassword": "",
|
||||||
|
"dbName": "testdb",
|
||||||
|
"collectionName": "monitor_data",
|
||||||
|
"column": [
|
||||||
|
{
|
||||||
|
"name": "ct",
|
||||||
|
"type": "date"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "pv",
|
||||||
|
"type": "float"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "tv",
|
||||||
|
"type": "float"
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "pid",
|
||||||
|
"type": "float"
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
"writer": {
|
||||||
|
"name": "tdenginewriter",
|
||||||
|
"parameter": {
|
||||||
|
"username": "root",
|
||||||
|
"password": "hmdata",
|
||||||
|
"column": [
|
||||||
|
"ts",
|
||||||
|
"pressure",
|
||||||
|
"temperature",
|
||||||
|
"position_id"
|
||||||
|
],
|
||||||
|
"connection": [
|
||||||
|
{
|
||||||
|
"table": [
|
||||||
|
"pipeline_data"
|
||||||
|
],
|
||||||
|
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.213:6041/mongo3040"
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"batchSize": 1000,
|
||||||
|
"ignoreTagsUnmatched": true
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
],
|
||||||
|
"setting": {
|
||||||
|
"speed": {
|
||||||
|
"channel": 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user