This commit is contained in:
zyyang 2022-02-16 10:06:06 +08:00
parent df3ea169b0
commit 67dd21edfe
25 changed files with 743 additions and 227 deletions

View File

@ -38,6 +38,12 @@
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3-version}</version>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
@ -63,10 +69,12 @@
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>${commons-lang3-version}</version>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.49</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>

View File

@ -4,6 +4,5 @@ import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
public interface DataHandler {
int handle(RecordReceiver lineReceiver, TaskPluginCollector collector);
}

View File

@ -14,10 +14,8 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.*;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
public class DefaultDataHandler implements DataHandler {
@ -83,6 +81,11 @@ public class DefaultDataHandler implements DataHandler {
}
count++;
}
if (!recordBatch.isEmpty()) {
affectedRows = writeBatch(conn, recordBatch);
recordBatch.clear();
}
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
@ -302,15 +305,6 @@ public class DefaultDataHandler implements DataHandler {
if (colMeta.type.equals("TIMESTAMP"))
return dateAsLong(column) + "i64";
return "L'" + column.asString() + "'";
case BYTES:
case STRING:
if (colMeta.type.equals("TIMESTAMP"))
return column.asString() + "i64";
String value = column.asString();
if (colMeta.type.startsWith("BINARY"))
return "'" + Utils.escapeSingleQuota(value) + "'";
if (colMeta.type.startsWith("NCHAR"))
return "L'" + Utils.escapeSingleQuota(value) + "'";
case NULL:
case BAD:
return "NULL";
@ -331,6 +325,16 @@ public class DefaultDataHandler implements DataHandler {
if (colMeta.type.equals("BIGINT"))
return column.asString() + "i64";
}
case BYTES:
case STRING:
if (colMeta.type.equals("TIMESTAMP"))
return column.asString() + "i64";
String value = column.asString();
value = value.replace("\"", "\\\"");
if (colMeta.type.startsWith("BINARY"))
return "\"" + value + "\"";
if (colMeta.type.startsWith("NCHAR"))
return "L\"" + value + "\"";
case BOOL:
default:
return column.asString();
@ -363,7 +367,9 @@ public class DefaultDataHandler implements DataHandler {
boolean tagsAllMatch = columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return colMeta.isTag;
}).allMatch(colMeta -> {
return record.getColumn(indexOf(colMeta.field)).asString().equals(colMeta.value.toString());
Column column = record.getColumn(indexOf(colMeta.field));
boolean equals = equals(column, colMeta);
return equals;
});
if (ignoreTagsUnmatched && !tagsAllMatch)
@ -386,6 +392,28 @@ public class DefaultDataHandler implements DataHandler {
return executeUpdate(conn, sql);
}
private boolean equals(Column column, ColumnMeta colMeta) {
switch (column.getType()) {
case BOOL:
return column.asBoolean().equals(Boolean.valueOf(colMeta.value.toString()));
case INT:
case LONG:
return column.asLong().equals(Long.valueOf(colMeta.value.toString()));
case DOUBLE:
return column.asDouble().equals(Double.valueOf(colMeta.value.toString()));
case NULL:
return colMeta.value == null;
case DATE:
return column.asDate().getTime() == ((Timestamp) colMeta.value).getTime();
case BAD:
case BYTES:
return Arrays.equals(column.asBytes(), (byte[]) colMeta.value);
case STRING:
default:
return column.asString().equals(colMeta.value.toString());
}
}
/**
* table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"]
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)

View File

@ -1,20 +0,0 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import java.util.Locale;
import java.util.ResourceBundle;
/**
* i18n message util
*/
public class Msg {
private static ResourceBundle bundle;
static {
bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault());
}
public static String get(String key) {
return bundle.getString(key);
}
}

View File

@ -81,7 +81,7 @@ public class SchemaManager {
for (String tbname : tables) {
if (!tableMetas.containsKey(tbname)) {
LOG.error("table metadata of " + tbname + " is empty!");
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "table metadata of " + tbname + " is empty!");
}
}
} catch (SQLException e) {

View File

@ -3,7 +3,7 @@
"class": "com.alibaba.datax.plugin.writer.tdenginewriter.TDengineWriter",
"description": {
"useScene": "data migration to tdengine",
"mechanism": "use JNI or taos-jdbc to write data to tdengine."
"mechanism": "use taos-jdbcdriver to write data."
},
"developer": "zyyang-taosdata"
"developer": "support@taosdata.com"
}

View File

@ -1,24 +1,20 @@
{
"name": "tdenginewriter",
"parameter": {
"host": "127.0.0.1",
"port": 6030,
"dbname": "test",
"user": "root",
"password": "taosdata",
"batchSize": 1000,
"stable": "weather",
"tagColumn": {
"station": 0
},
"fieldColumn": {
"latitude": 1,
"longtitude": 2,
"tmax": 4,
"tmin": 5
},
"timestampColumn":{
"date": 3
}
}
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
""
],
"connection": [
{
"table": [
""
],
"jdbcUrl": ""
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}

View File

@ -1,6 +0,0 @@
try_get_schema_from_db=fail to get structure info of target table from configure file and will try to get it from database
batch_size_too_small='batchSize' is too small, please increase it and try again
column_number_error=number of columns is less than expected
tag_value_error=tag columns include 'null' value
ts_value_error=timestamp column type error or null
infer_column_type_error=fail to infer column type: sample count %d, column index %d

View File

@ -1,6 +0,0 @@
try_get_schema_from_db=fail to get structure info of target table from configure file and will try to get it from database
batch_size_too_small='batchSize' is too small, please increase it and try again
column_number_error=number of columns is less than expected
tag_value_error=tag columns include 'null' value
ts_value_error=timestamp column type error or null
infer_column_type_error=fail to infer column type: sample count %d, column index %d

View File

@ -1,6 +0,0 @@
try_get_schema_from_db=\u65e0\u6cd5\u4ece\u914d\u7f6e\u6587\u4ef6\u83b7\u53d6\u8868\u7ed3\u6784\u4fe1\u606f\uff0c\u5c1d\u8bd5\u4ece\u6570\u636e\u5e93\u83b7\u53d6
batch_size_too_small=batchSize\u592a\u5c0f\uff0c\u4f1a\u589e\u52a0\u81ea\u52a8\u7c7b\u578b\u63a8\u65ad\u9519\u8bef\u7684\u6982\u7387\uff0c\u5efa\u8bae\u6539\u5927\u540e\u91cd\u8bd5
column_number_error=\u5b9e\u9645\u5217\u6570\u5c0f\u4e8e\u671f\u671b\u5217\u6570
tag_value_error=\u6807\u7b7e\u5217\u5305\u542bnull
ts_value_error=\u65f6\u95f4\u6233\u5217\u4e3anull\u6216\u7c7b\u578b\u9519\u8bef
infer_column_type_error=\u6839\u636e\u91c7\u6837\u7684%d\u6761\u6570\u636e\uff0c\u65e0\u6cd5\u63a8\u65ad\u7b2c%d\u5217\u7684\u6570\u636e\u7c7b\u578b

View File

@ -27,6 +27,8 @@ public class DefaultDataHandlerTest2 {
@Test
public void writeSupTableBySchemaless() throws SQLException {
// given
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +

View File

@ -1,65 +0,0 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.junit.Assert;
import org.junit.Test;
import java.sql.*;
public class EngineTest {
@Test
public void opentsdb2tdengine() throws SQLException {
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/opentsdb2tdengine.json"};
System.setProperty("datax.home", "../target/datax/datax");
try {
Engine.entry(params);
} catch (Throwable e) {
e.printStackTrace();
}
// assert
String jdbcUrl = "jdbc:TAOS://192.168.56.105:6030/test?timestampFormat=TIMESTAMP";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("select count(*) from weather_temperature");
int rows = 0;
while (rs.next()) {
rows = rs.getInt("count(*)");
}
Assert.assertEquals(5, rows);
stmt.close();
}
}
@Test
public void mysql2tdengine() {
// given
// MYSQL SQL:
// create table t(id int primary key AUTO_INCREMENT, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, f6 double,ts timestamp, dt datetime,f7 nchar(64))
// insert into t(f1,f2,f3,f4,f5,f6,ts,dt,f7) values(1,2,3,4,5,6,'2022-01-28 12:00:00','2022-01-28 12:00:00', 'beijing');
// TDengine SQL:
// create table t(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, f6 double, dt timestamp,f7 nchar(64));
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/mysql2tdengine.json"};
System.setProperty("datax.home", "../target/datax/datax");
try {
Engine.entry(params);
} catch (Throwable e) {
e.printStackTrace();
}
}
@Test
public void tdengine2tdengine() {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/tdengine2tdengine.json"};
System.setProperty("datax.home", "../target/datax/datax");
try {
Engine.entry(params);
} catch (Throwable e) {
e.printStackTrace();
}
}
}

View File

@ -1,25 +0,0 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import org.junit.Test;
import java.util.Locale;
import java.util.ResourceBundle;
import org.junit.Assert;
public class MessageTest {
@Test
public void testChineseMessage() {
Locale local = new Locale("zh", "CN");
ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", local);
String msg = bundle.getString("try_get_schema_fromdb");
Assert.assertEquals("无法从配置文件获取表结构信息,尝试从数据库获取", msg);
}
@Test
public void testDefaultMessage() {
ResourceBundle bundle = ResourceBundle.getBundle("tdenginewritermsg", Locale.getDefault());
String msg = bundle.getString("try_get_schema_fromdb");
System.out.println(msg);
}
}

View File

@ -0,0 +1,70 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Random;
public class Mysql2TDengineTest {
private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.1.93";
private static final Random random = new Random(System.currentTimeMillis());
@Test
public void mysql2tdengine() throws Throwable {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/m2t-1.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Before
public void before() throws SQLException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String ts = sdf.format(new Date(System.currentTimeMillis()));
final String url = "jdbc:mysql://" + host1 + ":3306/?useSSL=false&useUnicode=true&charset=UTF-8&generateSimpleParameterMetadata=true";
try (Connection conn = DriverManager.getConnection(url, "root", "123456")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db1");
stmt.execute("create database if not exists db1");
stmt.execute("use db1");
stmt.execute("create table stb1(id int primary key AUTO_INCREMENT, " +
"f1 tinyint, f2 smallint, f3 int, f4 bigint, " +
"f5 float, f6 double, " +
"ts timestamp, dt datetime," +
"f7 nchar(100), f8 varchar(100))");
for (int i = 1; i <= 10; i++) {
String sql = "insert into stb1(f1, f2, f3, f4, f5, f6, ts, dt, f7, f8) values(" +
i + "," + random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextInt(100) + "," +
random.nextFloat() + "," + random.nextDouble() + ", " +
"'" + ts + "', '" + ts + "', " +
"'中国北京朝阳望京abc', '中国北京朝阳望京adc')";
stmt.execute(sql);
}
stmt.close();
}
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041/";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.stb2(" +
"ts timestamp, dt timestamp, " +
"f1 tinyint, f2 smallint, f3 int, f4 bigint, " +
"f5 float, f6 double, " +
"f7 nchar(100), f8 nchar(100))");
stmt.close();
}
}
}

View File

@ -0,0 +1,36 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.junit.Assert;
import org.junit.Test;
import java.sql.*;
public class Opentsdb2TDengineTest {
@Test
public void opentsdb2tdengine() throws SQLException {
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/opentsdb2tdengine.json"};
System.setProperty("datax.home", "../target/datax/datax");
try {
Engine.entry(params);
} catch (Throwable e) {
e.printStackTrace();
}
// assert
String jdbcUrl = "jdbc:TAOS://192.168.56.105:6030/test?timestampFormat=TIMESTAMP";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("select count(*) from weather_temperature");
int rows = 0;
while (rs.next()) {
rows = rs.getInt("count(*)");
}
Assert.assertEquals(5, rows);
stmt.close();
}
}
}

View File

@ -0,0 +1,4 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
public class Rdbms2TDengineTest {
}

View File

@ -0,0 +1,68 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.util.Random;
public class TDengine2TDengineTest {
private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.1.93";
private static final String db1 = "db1";
private static final String stb1 = "stb1";
private static final String db2 = "db2";
private static final String stb2 = "stb2";
private static Random random = new Random(System.currentTimeMillis());
@Test
public void case_01() throws Throwable {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-1.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Test
public void case_02() throws Throwable {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-2.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Before
public void before() throws SQLException {
final String url = "jdbc:TAOS-RS://" + host1 + ":6041";
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists " + db1);
stmt.execute("create database if not exists " + db1);
stmt.execute("create table " + db1 + "." + stb1 + " (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint," +
" f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " +
"t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))");
for (int i = 0; i < 10; i++) {
String sql = "insert into " + db1 + ".t" + (i + 1) + " using " + db1 + "." + stb1 + " tags(now+" + i + "s," +
random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextInt(100) + "," +
random.nextInt(100) + "," + random.nextFloat() + "," + random.nextDouble() + "," +
random.nextBoolean() + ",'abc123ABC','北京朝阳望京') values(now+" + i + "s, " +
random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextInt(100) + "," +
random.nextInt(100) + "," + random.nextFloat() + "," + random.nextDouble() + "," +
random.nextBoolean() + ",'abc123ABC','北京朝阳望京')";
stmt.execute(sql);
}
}
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists " + db2);
stmt.execute("create database if not exists " + db2);
stmt.execute("create table " + db2 + "." + stb2 + " (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint," +
" f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " +
"t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))");
stmt.close();
}
}
}

View File

@ -0,0 +1,62 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Random;
public class TDengine2TDengineTest3 {
private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.1.93";
private static Random random = new Random(System.currentTimeMillis());
@Test
public void case_03() throws Throwable {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-3.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Before
public void before() throws SQLException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String ts = sdf.format(new Date(System.currentTimeMillis()));
final String url = "jdbc:TAOS-RS://" + host1 + ":6041";
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db1");
stmt.execute("create database if not exists db1");
stmt.execute("create table db1.stb1 (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, " +
"f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " +
"t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))");
for (int i = 1; i <= 10; i++) {
String sql = "insert into db1.t" + i + " using db1.stb1 tags('" + ts + "'," + i + ",2,3,4,5.0,6.0,true,'abc123ABC','北京朝阳望京') " +
"values(now+" + i + "s, " + random.nextInt(100) + "," + random.nextInt(100) + "," +
random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextFloat() + "," +
random.nextDouble() + "," + random.nextBoolean() + ",'abc123ABC','北京朝阳望京')";
stmt.execute(sql);
}
}
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.stb2 (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint," +
" f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100)) tags(t1 timestamp, t2 tinyint, " +
"t3 smallint, t4 int, t5 bigint, t6 float, t7 double, t8 bool, t9 binary(100), t10 nchar(1000))");
stmt.execute("create table db2.t1 using db2.stb2 tags('" + ts + "',1,2,3,4,5.0,6.0,true,'abc123ABC','北京朝阳望京')");
stmt.close();
}
}
}

View File

@ -0,0 +1,57 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Random;
public class TDengine2TDengineTest4 {
private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.1.93";
private static Random random = new Random(System.currentTimeMillis());
@Test
public void case_04() throws Throwable {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/t2t-4.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Before
public void before() throws SQLException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String ts = sdf.format(new Date(System.currentTimeMillis()));
final String url = "jdbc:TAOS-RS://" + host1 + ":6041";
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db1");
stmt.execute("create database if not exists db1");
stmt.execute("create table db1.weather (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, " +
"f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100))");
for (int i = 1; i <= 10; i++) {
String sql = "insert into db1.weather values(now+" + i + "s, " + random.nextInt(100) + "," + random.nextInt(100) + "," +
random.nextInt(100) + "," + random.nextInt(100) + "," + random.nextFloat() + "," +
random.nextDouble() + "," + random.nextBoolean() + ",'abc123ABC','北京朝阳望京')";
stmt.execute(sql);
}
}
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.weather (ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, " +
"f5 float, f6 double, f7 bool, f8 binary(100), f9 nchar(100))");
stmt.close();
}
}
}

View File

@ -8,16 +8,25 @@
"username": "root",
"password": "123456",
"column": [
"*"
"ts",
"dt",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8"
],
"splitPk": "id",
"connection": [
{
"table": [
"t"
"stb1"
],
"jdbcUrl": [
"jdbc:mysql://192.168.56.105:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8"
"jdbc:mysql://192.168.56.105:3306/db1?useSSL=false&useUnicode=true&characterEncoding=utf8"
]
}
]
@ -30,16 +39,22 @@
"password": "taosdata",
"column": [
"ts",
"dt",
"f1",
"f2",
"t1"
"f3",
"f4",
"f5",
"f6",
"f7",
"f8"
],
"connection": [
{
"table": [
"st"
"stb2"
],
"jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/test?timestampFormat=TIMESTAMP"
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
}
],
"batchSize": 1000,

View File

@ -0,0 +1,94 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdenginereader",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"table": [
"stb1"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1?timestampFormat=TIMESTAMP"
}
],
"column": [
"tbname",
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"t1",
"t2",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
"t10"
],
"beginDateTime": "2022-02-15 00:00:00",
"endDateTime": "2022-02-16 00:00:00",
"splitInterval": "1d"
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"t1",
"t2",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
"t10"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2?timestampFormat=TIMESTAMP"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,92 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdenginereader",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"table": [
"stb1"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1?timestampFormat=TIMESTAMP"
}
],
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"t1",
"t2",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
"t10"
],
"beginDateTime": "2022-02-15 00:00:00",
"endDateTime": "2022-02-16 00:00:00",
"splitInterval": "1d"
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"t1",
"t2",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
"t10"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,92 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdenginereader",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"table": [
"stb1"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1?timestampFormat=TIMESTAMP"
}
],
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"t1",
"t2",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
"t10"
],
"beginDateTime": "2022-02-15 00:00:00",
"endDateTime": "2022-02-16 00:00:00",
"splitInterval": "1d"
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9",
"t1",
"t2",
"t3",
"t4",
"t5",
"t6",
"t7",
"t8",
"t9",
"t10"
],
"connection": [
{
"table": [
"t1"
],
"jdbcUrl": "jdbc:TAOS://192.168.1.93:6030/db2?timestampFormat=TIMESTAMP"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,72 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdenginereader",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"table": [
"weather"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/db1?timestampFormat=TIMESTAMP"
}
],
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9"
],
"beginDateTime": "2022-02-15 00:00:00",
"endDateTime": "2022-02-16 00:00:00",
"splitInterval": "1d"
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"f2",
"f3",
"f4",
"f5",
"f6",
"f7",
"f8",
"f9"
],
"connection": [
{
"table": [
"weather"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.1.93:6041/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -1,51 +0,0 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdenginereader",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"table": [
"meters"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP"
}
],
"column": [
"ts",
"current",
"voltage",
"phase",
"groupid",
"location"
],
"beginDateTime": "2017-07-14 10:40:00",
"endDateTime": "2017-08-14 10:40:00",
"splitInterval": "1d"
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "192.168.56.105",
"port": 6030,
"dbName": "test2",
"username": "root",
"password": "taosdata",
"batchSize": 1000,
"stable": "meters"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}