feat: migrate mongo's data to tdengine without tbname specified #TS-1538 #TS-1539

This commit is contained in:
zyyang 2022-06-15 10:57:10 +08:00
parent 03465162ed
commit beaf7aa620
18 changed files with 774 additions and 63 deletions

View File

@ -29,12 +29,6 @@
</exclusions> </exclusions>
</dependency> </dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<dependency> <dependency>
<groupId>com.alibaba.datax.tdenginewriter</groupId> <groupId>com.alibaba.datax.tdenginewriter</groupId>
<artifactId>tdenginewriter</artifactId> <artifactId>tdenginewriter</artifactId>
@ -45,13 +39,7 @@
<dependency> <dependency>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>2.0.37</version> <version>2.0.39</version>
<exclusions>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>

View File

@ -20,22 +20,10 @@
<dependencies> <dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.78</version>
</dependency>
<dependency> <dependency>
<groupId>com.taosdata.jdbc</groupId> <groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId> <artifactId>taos-jdbcdriver</artifactId>
<version>2.0.37</version> <version>2.0.39</version>
<exclusions>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</exclusion>
</exclusions>
</dependency> </dependency>
<dependency> <dependency>
@ -74,6 +62,7 @@
<version>5.1.49</version> <version>5.1.49</version>
<scope>test</scope> <scope>test</scope>
</dependency> </dependency>
<!-- 添加 dm8 jdbc jar 包依赖--> <!-- 添加 dm8 jdbc jar 包依赖-->
<!-- <dependency>--> <!-- <dependency>-->
<!-- <groupId>com.dameng</groupId>--> <!-- <groupId>com.dameng</groupId>-->

View File

@ -17,19 +17,20 @@ import java.sql.*;
import java.util.*; import java.util.*;
import java.util.Date; import java.util.Date;
import java.util.stream.Collectors; import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class DefaultDataHandler implements DataHandler { public class DefaultDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
static { static {
try { try {
Class.forName("com.taosdata.jdbc.TSDBDriver"); Class.forName("com.taosdata.jdbc.TSDBDriver");
Class.forName("com.taosdata.jdbc.rs.RestfulDriver"); Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
} catch (ClassNotFoundException e) { } catch (ClassNotFoundException e) {
e.printStackTrace(); LOG.error(e.getMessage(), e);
} }
} }
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
private final TaskPluginCollector taskPluginCollector; private final TaskPluginCollector taskPluginCollector;
private String username; private String username;
private String password; private String password;
@ -47,15 +48,15 @@ public class DefaultDataHandler implements DataHandler {
this.tableMetas = tableMetas; this.tableMetas = tableMetas;
} }
public void setColumnMetas(Map<String, List<ColumnMeta>> columnMetas) { public void setTbnameColumnMetasMap(Map<String, List<ColumnMeta>> tbnameColumnMetasMap) {
this.columnMetas = columnMetas; this.tbnameColumnMetasMap = tbnameColumnMetasMap;
} }
public void setSchemaManager(SchemaManager schemaManager) { public void setSchemaManager(SchemaManager schemaManager) {
this.schemaManager = schemaManager; this.schemaManager = schemaManager;
} }
private Map<String, List<ColumnMeta>> columnMetas; private Map<String, List<ColumnMeta>> tbnameColumnMetasMap;
public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) { public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) {
this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME); this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
@ -73,14 +74,13 @@ public class DefaultDataHandler implements DataHandler {
int count = 0; int count = 0;
int affectedRows = 0; int affectedRows = 0;
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) { try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established."); LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
// prepare table_name -> table_meta // prepare table_name -> table_meta
this.schemaManager = new SchemaManager(conn); this.schemaManager = new SchemaManager(conn);
this.tableMetas = schemaManager.loadTableMeta(tables); this.tableMetas = schemaManager.loadTableMeta(tables);
// prepare table_name -> column_meta // prepare table_name -> column_meta
this.columnMetas = schemaManager.loadColumnMetas(tables); this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables);
List<Record> recordBatch = new ArrayList<>(); List<Record> recordBatch = new ArrayList<>();
Record record; Record record;
@ -152,10 +152,12 @@ public class DefaultDataHandler implements DataHandler {
TableMeta tableMeta = tableMetas.get(table); TableMeta tableMeta = tableMetas.get(table);
switch (tableMeta.tableType) { switch (tableMeta.tableType) {
case SUP_TABLE: { case SUP_TABLE: {
if (columns.contains("tbname")) if (columns.contains("tbname")) {
affectedRows += writeBatchToSupTableBySQL(conn, table, recordBatch); affectedRows += writeBatchToSupTableBySQL(conn, table, recordBatch);
else } else {
affectedRows += writeBatchToSupTableBySchemaless(conn, table, recordBatch); Map<String, String> tag2Tbname = schemaManager.loadTagTableNameMap(table);
affectedRows += writeBatchToSupTableWithoutTbname(conn, table, recordBatch, tag2Tbname);
}
} }
break; break;
case SUB_TABLE: case SUB_TABLE:
@ -169,13 +171,82 @@ public class DefaultDataHandler implements DataHandler {
return affectedRows; return affectedRows;
} }
private int writeBatchToSupTableWithoutTbname(Connection conn, String table, List<Record> recordBatch, Map<String, String> tag2Tbname) throws SQLException {
List<ColumnMeta> columnMetas = tbnameColumnMetasMap.get(table);
List<Record> subTableExist = filterSubTableExistRecords(recordBatch, columnMetas, tag2Tbname);
List<Record> subTableNotExist = filterSubTableNotExistRecords(recordBatch, columnMetas, tag2Tbname);
int affectedRows = 0;
Map<String, List<Record>> subTableRecordsMap = splitRecords(subTableExist, columnMetas, tag2Tbname);
List<String> subTables = new ArrayList<>(subTableRecordsMap.keySet());
this.tbnameColumnMetasMap.putAll(schemaManager.loadColumnMetas(subTables));
for (String subTable : subTableRecordsMap.keySet()) {
List<Record> subTableRecords = subTableRecordsMap.get(subTable);
affectedRows += writeBatchToNormalTable(conn, subTable, subTableRecords);
}
if (!subTableNotExist.isEmpty())
affectedRows += writeBatchToSupTableBySchemaless(conn, table, subTableNotExist);
return affectedRows;
}
private List<Record> filterSubTableExistRecords(List<Record> recordBatch, List<ColumnMeta> columnMetas, Map<String, String> tag2Tbname) {
return recordBatch.stream().filter(record -> {
String tagStr = getTagString(columnMetas, record);
return tag2Tbname.containsKey(tagStr);
}).collect(Collectors.toList());
}
private List<Record> filterSubTableNotExistRecords(List<Record> recordBatch, List<ColumnMeta> columnMetas, Map<String, String> tag2Tbname) {
return recordBatch.stream().filter(record -> {
String tagStr = getTagString(columnMetas, record);
return !tag2Tbname.containsKey(tagStr);
}).collect(Collectors.toList());
}
private Map<String, List<Record>> splitRecords(List<Record> subTableExist, List<ColumnMeta> columnMetas, Map<String, String> tag2Tbname) {
Map<String, List<Record>> ret = new HashMap<>();
for (Record record : subTableExist) {
String tagstr = getTagString(columnMetas, record);
String tbname = tag2Tbname.get(tagstr);
if (ret.containsKey(tbname)) {
ret.get(tbname).add(record);
} else {
List<Record> list = new ArrayList<>();
list.add(record);
ret.put(tbname, list);
}
}
return ret;
}
private String getTagString(List<ColumnMeta> columnMetas, Record record) {
return IntStream.range(0, columnMetas.size()).mapToObj(colIndex -> {
ColumnMeta columnMeta = columnMetas.get(colIndex);
if (columnMeta.isTag) {
Column column = record.getColumn(colIndex);
switch (columnMeta.type) {
case "TINYINT":
case "SMALLINT":
case "INT":
case "BIGINT":
return column.asLong().toString();
default:
return column.asString();
}
}
return "";
}).collect(Collectors.joining());
}
/** /**
* insert into record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) * insert into record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
*/ */
private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) throws SQLException { private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.columnMetas.get(table); List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
StringBuilder sb = new StringBuilder("insert into"); StringBuilder sb = new StringBuilder("insert into");
for (Record record : recordBatch) { for (Record record : recordBatch) {
@ -257,7 +328,7 @@ public class DefaultDataHandler implements DataHandler {
int count = 0; int count = 0;
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision(); TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
List<ColumnMeta> columnMetaList = this.columnMetas.get(table); List<ColumnMeta> columnMetaList = this.tbnameColumnMetasMap.get(table);
ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get(); ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get();
List<String> lines = new ArrayList<>(); List<String> lines = new ArrayList<>();
@ -394,7 +465,7 @@ public class DefaultDataHandler implements DataHandler {
* insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)]) * insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
*/ */
private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) throws SQLException { private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.columnMetas.get(table); List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("insert into ").append(table).append(" ") sb.append("insert into ").append(table).append(" ")
@ -464,18 +535,18 @@ public class DefaultDataHandler implements DataHandler {
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...) * sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
*/ */
private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) throws SQLException { private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.columnMetas.get(table); List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
sb.append("insert into ").append(table) sb.append("insert into ").append(table)
.append(" ") .append(" ")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> { .append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
return colMeta.field; return colMeta.field;
}).collect(Collectors.joining(",", "(", ")"))) }).collect(Collectors.joining(",", "(", ")")))
.append(" values "); .append(" values ");
for (Record record : recordBatch) { for (Record record : recordBatch) {
sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> { sb.append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
return buildColumnValue(colMeta, record); return buildColumnValue(colMeta, record);
}).collect(Collectors.joining(",", "(", ")"))); }).collect(Collectors.joining(",", "(", ")")));
} }

View File

@ -5,17 +5,18 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.sql.Connection; import java.sql.*;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*; import java.util.*;
import java.util.stream.Collectors;
public class SchemaManager { public class SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class); private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
// private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "_";
private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "";
private final Connection conn; private final Connection conn;
private TimestampPrecision precision; private TimestampPrecision precision;
private Map<String, Map<String, String>> tags2tbnameMaps = new HashMap<>();
public SchemaManager(Connection conn) { public SchemaManager(Connection conn) {
this.conn = conn; this.conn = conn;
@ -169,4 +170,37 @@ public class SchemaManager {
return columnMeta; return columnMeta;
} }
public Map<String, String> loadTagTableNameMap(String table) throws SQLException {
if (tags2tbnameMaps.containsKey(table))
return tags2tbnameMaps.get(table);
Map<String, String> tags2tbname = new HashMap<>();
try (Statement stmt = conn.createStatement()) {
// describe table
List<String> tags = new ArrayList<>();
ResultSet rs = stmt.executeQuery("describe " + table);
while (rs.next()) {
String note = rs.getString("Note");
if ("TAG".equals(note)) {
tags.add(rs.getString("Field"));
}
}
// select distinct tbname, t1, t2 from stb
rs = stmt.executeQuery("select distinct " + String.join(",", tags) + ",tbname from " + table);
while (rs.next()) {
ResultSet finalRs = rs;
String tagStr = tags.stream().map(t -> {
try {
return finalRs.getString(t);
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
return "NULL";
}).collect(Collectors.joining(TAG_TABLE_NAME_MAP_KEY_SPLITTER));
String tbname = rs.getString("tbname");
tags2tbname.put(tagStr, tbname);
}
}
tags2tbnameMaps.put(table, tags2tbname);
return tags2tbname;
}
} }

View File

@ -5,7 +5,6 @@ import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn; import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.record.DefaultRecord; import com.alibaba.datax.core.transport.record.DefaultRecord;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -60,7 +59,7 @@ public class DefaultDataHandlerTest {
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables); Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas); handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas); handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager); handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList); int count = handler.writeBatch(conn, recordList);
@ -98,7 +97,7 @@ public class DefaultDataHandlerTest {
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables); Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas); handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas); handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager); handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList); int count = handler.writeBatch(conn, recordList);
@ -138,7 +137,7 @@ public class DefaultDataHandlerTest {
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables); Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas); handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas); handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager); handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(connection, recordList); int count = handler.writeBatch(connection, recordList);
@ -177,7 +176,7 @@ public class DefaultDataHandlerTest {
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables); Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas); handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas); handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager); handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList); int count = handler.writeBatch(conn, recordList);
@ -216,7 +215,7 @@ public class DefaultDataHandlerTest {
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables); Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas); handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas); handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager); handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList); int count = handler.writeBatch(conn, recordList);
@ -255,7 +254,7 @@ public class DefaultDataHandlerTest {
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables); Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas); handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas); handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager); handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList); int count = handler.writeBatch(conn, recordList);

View File

@ -0,0 +1,16 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.junit.Test;
public class Mongo2TDengineTest {
@Test
public void case01() throws Throwable {
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/mongo2t.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
}

View File

@ -1,6 +1,5 @@
package com.alibaba.datax.plugin.writer.tdenginewriter; package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.fastjson.util.TypeUtils;
import org.junit.AfterClass; import org.junit.AfterClass;
import org.junit.Assert; import org.junit.Assert;
import org.junit.BeforeClass; import org.junit.BeforeClass;
@ -62,6 +61,23 @@ public class SchemaManagerTest {
Assert.assertEquals(4, stb1.size()); Assert.assertEquals(4, stb1.size());
} }
@Test
public void loadTagTableNameMap() throws SQLException {
// given
SchemaManager schemaManager = new SchemaManager(conn);
String table = "stb3";
// when
Map<String, String> tagTableMap = schemaManager.loadTagTableNameMap(table);
// then
Assert.assertEquals(2, tagTableMap.keySet().size());
Assert.assertTrue(tagTableMap.containsKey("11.1abc"));
Assert.assertTrue(tagTableMap.containsKey("22.2defg"));
Assert.assertEquals("tb5", tagTableMap.get("11.1abc"));
Assert.assertEquals("tb6", tagTableMap.get("22.2defg"));
}
@BeforeClass @BeforeClass
public static void beforeClass() throws SQLException { public static void beforeClass() throws SQLException {
conn = DriverManager.getConnection("jdbc:TAOS-RS://192.168.56.105:6041", "root", "taosdata"); conn = DriverManager.getConnection("jdbc:TAOS-RS://192.168.56.105:6041", "root", "taosdata");
@ -76,6 +92,9 @@ public class SchemaManagerTest {
stmt.execute("insert into tb3 using stb2 tags(1,1) values(now, 1, 2, 3)"); stmt.execute("insert into tb3 using stb2 tags(1,1) values(now, 1, 2, 3)");
stmt.execute("insert into tb4 using stb2 tags(2,2) values(now, 1, 2, 3)"); stmt.execute("insert into tb4 using stb2 tags(2,2) values(now, 1, 2, 3)");
stmt.execute("create table weather(ts timestamp, f1 int, f2 int, f3 int, t1 int, t2 int)"); stmt.execute("create table weather(ts timestamp, f1 int, f2 int, f3 int, t1 int, t2 int)");
stmt.execute("create table stb3(ts timestamp, f1 int) tags(t1 int, t2 float, t3 nchar(32))");
stmt.execute("insert into tb5 using stb3 tags(1,1.1,'abc') values(now, 1)");
stmt.execute("insert into tb6 using stb3 tags(2,2.2,'defg') values(now, 2)");
} }
} }

View File

@ -0,0 +1,9 @@
#!/bin/bash
datax_home_dir=$(dirname $(readlink -f "$0"))
curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'drop table if exists db2.stb2;' 192.168.1.93:6041/rest/sql
curl -H 'Authorization: Basic cm9vdDp0YW9zZGF0YQ==' -d 'create table if not exists db2.stb2 (`ts` TIMESTAMP,`f2` SMALLINT,`f4` BIGINT,`f5` FLOAT,`f6` DOUBLE,`f7` DOUBLE,`f8` BOOL,`f9` NCHAR(100),`f10` NCHAR(200)) TAGS (`f1` TINYINT,`f3` INT);' 192.168.1.93:6041/rest/sql
rm -f ${datax_home_dir}/log/*
rm -f ${datax_home_dir}/job/*.csv

View File

@ -0,0 +1,106 @@
{
"job": {
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": [
"/root/workspace/tmp/a.txt"
],
"encoding": "UTF-8",
"column": [
{
"index": 0,
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS"
},
{
"index": 1,
"type": "long"
},
{
"index": 2,
"type": "long"
},
{
"index": 3,
"type": "long"
},
{
"index": 4,
"type": "long"
},
{
"index": 5,
"type": "double"
},
{
"index": 6,
"type": "double"
},
{
"index": 7,
"type": "boolean"
},
{
"index": 8,
"type": "string"
},
{
"index": 9,
"type": "string"
},
{
"index": 10,
"type": "date",
"format": "yyyy-MM-dd HH:mm:ss.SSS"
},
{
"index": 11,
"type": "string"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"t1",
"tbname"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,57 @@
{
"job": {
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": [
"/root/workspace/tmp/a.txt"
],
"encoding": "UTF-8",
"column": [
"*"
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"t1",
"tbname"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,62 @@
{
"job": {
"content": [
{
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "TESTUSER",
"password": "test123456",
"connection": [
{
"querySql": [
"select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1;"
],
"jdbcUrl": [
"jdbc:dm://192.168.0.72:5236"
]
}
],
"fetchSize": 1024
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"f10"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,62 @@
{
"job": {
"content": [
{
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "TESTUSER",
"password": "test123456",
"connection": [
{
"querySql": [
"select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1;"
],
"jdbcUrl": [
"jdbc:dm://192.168.0.72:5236"
]
}
],
"fetchSize": 1024
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"f10"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,63 @@
{
"job": {
"content": [
{
"reader": {
"name": "rdbmsreader",
"parameter": {
"username": "TESTUSER",
"password": "test123456",
"connection": [
{
"querySql": [
"select concat(concat(concat('t', f1), '_'),f3) as tbname,* from stb1"
],
"jdbcUrl": [
"jdbc:dm://192.168.0.72:5236"
]
}
],
"where": "1=1",
"fetchSize": 1024
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"f10"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,57 @@
#!/bin/bash
set -e
#set -x
datax_home_dir=$(dirname $(readlink -f "$0"))
table_name="stb1"
update_key="ts"
while getopts "hd:t:" arg; do
case $arg in
d)
datax_home_dir=$(echo $OPTARG)
;;
v)
table_name=$(echo $OPTARG)
;;
h)
echo "Usage: $(basename $0) -d [datax_home_dir] -t [table_name] -k [update_key]"
echo " -h help"
exit 0
;;
?) #unknow option
echo "unkonw argument"
exit 1
;;
esac
done
if [[ -e ${datax_home_dir}/job/${table_name}.csv ]]; then
MAX_TIME=$(cat ${datax_home_dir}/job/${table_name}.csv)
else
MAX_TIME="null"
fi
current_datetime=$(date +"%Y-%m-%d %H:%M:%S")
current_timestamp=$(date +%s)
if [ "$MAX_TIME" != "null" ]; then
WHERE="${update_key} >= '$MAX_TIME' and ${update_key} < '$current_datetime'"
sed "s/1=1/$WHERE/g" ${datax_home_dir}/job/dm2t-update.json >${datax_home_dir}/job/dm2t_${current_timestamp}.json
echo "incremental data synchronization, from '${MAX_TIME}' to '${current_datetime}'"
python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t_${current_timestamp}.json 1> /dev/null 2>&1
else
echo "full data synchronization, to '${current_datetime}'"
python ${datax_home_dir}/bin/datax.py ${datax_home_dir}/job/dm2t-update.json 1> /dev/null 2>&1
fi
if [[ $? -ne 0 ]]; then
echo "datax migration job falied"
else
echo ${current_datetime} >$datax_home_dir/job/${table_name}.csv
echo "datax migration job success"
fi
rm -rf ${datax_home_dir}/job/dm2t_${current_timestamp}.json
#while true; do ./dm2t_sync.sh; sleep 5s; done

View File

@ -0,0 +1,50 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdenginereader",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"*"
],
"connection": [
{
"table": [
"stb1"
],
"jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/db1"
}
]
}
},
"writer": {
"name": "rdbmswriter",
"parameter": {
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:dm://192.168.0.72:5236"
}
],
"username": "TESTUSER",
"password": "test123456",
"table": "stb2",
"column": [
"*"
]
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,50 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdenginereader",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"*"
],
"connection": [
{
"table": [
"stb1"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1"
}
]
}
},
"writer": {
"name": "rdbmswriter",
"parameter": {
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:dm://192.168.0.72:5236"
}
],
"username": "TESTUSER",
"password": "test123456",
"table": "stb2",
"column": [
"*"
]
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,13 @@
#!/bin/bash
scp t2dm-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job
scp t2dm-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job
scp dm2t-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job
scp dm2t-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job
scp dm2t-update.json root@192.168.56.105:/root/workspace/tmp/datax/job
scp csv2t-restful.json root@192.168.56.105:/root/workspace/tmp/datax/job
scp csv2t-jni.json root@192.168.56.105:/root/workspace/tmp/datax/job
scp dm2t_sync.sh root@192.168.56.105:/root/workspace/tmp/datax
scp clean_env.sh root@192.168.56.105:/root/workspace/tmp/datax

View File

@ -0,0 +1,66 @@
{
"job": {
"content": [
{
"reader": {
"name": "mongodbreader",
"parameter": {
"address": [
"192.168.1.213:27017"
],
"userName": "",
"userPassword": "",
"dbName": "testdb",
"collectionName": "monitor_data",
"column": [
{
"name": "ct",
"type": "date"
},
{
"name": "pv",
"type": "float"
},
{
"name": "tv",
"type": "float"
},
{
"name": "pid",
"type": "float"
}
]
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "hmdata",
"column": [
"ts",
"pressure",
"temperature",
"position_id"
],
"connection": [
{
"table": [
"pipeline_data"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.213:6041/mongo3040"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}