mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-04 03:40:58 +08:00
Merge pull request #34 from taosdata/fix/TS-1784
fix: null pointer exception since column size less than columnMeta.
This commit is contained in:
commit
ac6d04dc8f
@ -82,6 +82,18 @@ public class DefaultDataHandler implements DataHandler {
|
||||
// prepare table_name -> column_meta
|
||||
this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables);
|
||||
|
||||
// filter column
|
||||
for (String tableName : tbnameColumnMetasMap.keySet()) {
|
||||
List<ColumnMeta> columnMetaList = tbnameColumnMetasMap.get(tableName);
|
||||
Iterator<ColumnMeta> iterator = columnMetaList.iterator();
|
||||
while (iterator.hasNext()) {
|
||||
ColumnMeta columnMeta = iterator.next();
|
||||
if (!this.columns.contains(columnMeta.field)) {
|
||||
iterator.remove();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
List<Record> recordBatch = new ArrayList<>();
|
||||
Record record;
|
||||
for (int i = 1; (record = lineReceiver.getFromReader()) != null; i++) {
|
||||
@ -226,14 +238,18 @@ public class DefaultDataHandler implements DataHandler {
|
||||
ColumnMeta columnMeta = columnMetas.get(colIndex);
|
||||
if (columnMeta.isTag) {
|
||||
Column column = record.getColumn(colIndex);
|
||||
switch (columnMeta.type) {
|
||||
case "TINYINT":
|
||||
case "SMALLINT":
|
||||
case "INT":
|
||||
case "BIGINT":
|
||||
return column.asLong().toString();
|
||||
default:
|
||||
return column.asString();
|
||||
try {
|
||||
switch (columnMeta.type) {
|
||||
case "TINYINT":
|
||||
case "SMALLINT":
|
||||
case "INT":
|
||||
case "BIGINT":
|
||||
return column.asLong().toString();
|
||||
default:
|
||||
return column.asString();
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("failed to get Tag, colIndex: " + colIndex + ", ColumnMeta: " + columnMeta + ", record: " + record, e);
|
||||
}
|
||||
}
|
||||
return "";
|
||||
@ -250,8 +266,8 @@ public class DefaultDataHandler implements DataHandler {
|
||||
|
||||
StringBuilder sb = new StringBuilder("insert into");
|
||||
for (Record record : recordBatch) {
|
||||
sb.append(" ").append(record.getColumn(indexOf("tbname")).asString())
|
||||
.append(" using ").append(table)
|
||||
sb.append(" `").append(record.getColumn(indexOf("tbname")).asString())
|
||||
.append("` using ").append(table)
|
||||
.append(" tags")
|
||||
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
|
||||
return colMeta.isTag;
|
||||
@ -470,7 +486,7 @@ public class DefaultDataHandler implements DataHandler {
|
||||
List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("insert into ").append(table).append(" ")
|
||||
sb.append("insert into `").append(table).append("` ")
|
||||
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
|
||||
return !colMeta.isTag;
|
||||
}).map(colMeta -> {
|
||||
@ -540,8 +556,8 @@ public class DefaultDataHandler implements DataHandler {
|
||||
List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("insert into ").append(table)
|
||||
.append(" ")
|
||||
sb.append("insert into `").append(table)
|
||||
.append("` ")
|
||||
.append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
|
||||
return colMeta.field;
|
||||
}).collect(Collectors.joining(",", "(", ")")))
|
||||
|
@ -11,7 +11,7 @@ import java.util.Random;
|
||||
public class Mysql2TDengineTest {
|
||||
|
||||
private static final String host1 = "192.168.56.105";
|
||||
private static final String host2 = "192.168.1.93";
|
||||
private static final String host2 = "192.168.56.105";
|
||||
private static final Random random = new Random(System.currentTimeMillis());
|
||||
|
||||
@Test
|
||||
@ -21,6 +21,13 @@ public class Mysql2TDengineTest {
|
||||
Engine.entry(params);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void test2() throws Throwable {
|
||||
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/m2t-2.json"};
|
||||
System.setProperty("datax.home", "../target/datax/datax");
|
||||
Engine.entry(params);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() throws SQLException {
|
||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
|
||||
@ -50,7 +57,7 @@ public class Mysql2TDengineTest {
|
||||
stmt.close();
|
||||
}
|
||||
|
||||
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041/";
|
||||
final String url2 = "jdbc:TAOS://" + host2 + ":6030/";
|
||||
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
|
||||
Statement stmt = conn.createStatement();
|
||||
|
||||
|
@ -0,0 +1,67 @@
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
import com.alibaba.datax.core.Engine;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.sql.*;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Random;
|
||||
|
||||
public class Mysql2TDengineTest2 {
|
||||
|
||||
private static final String host1 = "192.168.56.105";
|
||||
private static final String host2 = "192.168.56.105";
|
||||
private static final Random random = new Random(System.currentTimeMillis());
|
||||
|
||||
@Test
|
||||
public void test2() throws Throwable {
|
||||
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/m2t-2.json"};
|
||||
System.setProperty("datax.home", "../target/datax/datax");
|
||||
Engine.entry(params);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() throws SQLException {
|
||||
final String[] tagList = {"北京", "海淀", "上海", "河北", "天津"};
|
||||
|
||||
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
|
||||
String ts = sdf.format(new Date(System.currentTimeMillis()));
|
||||
|
||||
final String url = "jdbc:mysql://" + host1 + ":3306/?useSSL=false&useUnicode=true&charset=UTF-8&generateSimpleParameterMetadata=true";
|
||||
try (Connection conn = DriverManager.getConnection(url, "root", "123456")) {
|
||||
Statement stmt = conn.createStatement();
|
||||
|
||||
stmt.execute("drop database if exists db1");
|
||||
stmt.execute("create database if not exists db1");
|
||||
stmt.execute("use db1");
|
||||
stmt.execute("create table stb1(id int primary key AUTO_INCREMENT, " +
|
||||
"f1 int, f2 float, f3 double, f4 varchar(100), t1 varchar(100), ts timestamp)");
|
||||
for (int i = 1; i <= 10; i++) {
|
||||
String sql = "insert into stb1(f1, f2, f3, f4, t1, ts) values("
|
||||
+ random.nextInt(100) + "," + random.nextFloat() * 100 + "," + random.nextDouble() * 100
|
||||
+ ",'" + RandomStringUtils.randomAlphanumeric(10)
|
||||
+ "', '" + tagList[random.nextInt(tagList.length)]
|
||||
+ "', '" + (ts + i * 1000) + "')";
|
||||
stmt.execute(sql);
|
||||
}
|
||||
|
||||
stmt.close();
|
||||
}
|
||||
|
||||
final String url2 = "jdbc:TAOS://" + host2 + ":6030/";
|
||||
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
|
||||
Statement stmt = conn.createStatement();
|
||||
|
||||
stmt.execute("drop database if exists db2");
|
||||
stmt.execute("create database if not exists db2");
|
||||
stmt.execute("create table db2.stb2(ts timestamp, f1 int, f2 float, f3 double, f4 nchar(100)) tags(t1 nchar(100))");
|
||||
|
||||
stmt.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,76 @@
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
import com.alibaba.datax.core.Engine;
|
||||
import org.apache.commons.lang3.RandomStringUtils;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.sql.*;
|
||||
import java.text.SimpleDateFormat;
|
||||
import java.util.Random;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.IntStream;
|
||||
|
||||
public class Mysql2TDengineTest3 {
|
||||
|
||||
private static final String host1 = "192.168.56.105";
|
||||
private static final String host2 = "192.168.56.105";
|
||||
private static final Random random = new Random(System.currentTimeMillis());
|
||||
|
||||
@Test
|
||||
public void test2() throws Throwable {
|
||||
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/m2t-3.json"};
|
||||
System.setProperty("datax.home", "../target/datax/datax");
|
||||
Engine.entry(params);
|
||||
}
|
||||
|
||||
@Before
|
||||
public void before() throws SQLException {
|
||||
// given
|
||||
long ts_start = new Date(System.currentTimeMillis()).getTime();
|
||||
final int columnSize = 10;
|
||||
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
|
||||
|
||||
final String url = "jdbc:mysql://" + host1 + ":3306/?useSSL=false&useUnicode=true&charset=UTF-8&generateSimpleParameterMetadata=true";
|
||||
try (Connection conn = DriverManager.getConnection(url, "root", "123456")) {
|
||||
Statement stmt = conn.createStatement();
|
||||
|
||||
stmt.execute("drop database if exists db1");
|
||||
stmt.execute("create database if not exists db1");
|
||||
stmt.execute("use db1");
|
||||
stmt.execute("create table stb1(id int primary key AUTO_INCREMENT, "
|
||||
+ IntStream.range(1, columnSize).mapToObj(i -> "f" + i + " int").collect(Collectors.joining(",")) + ", "
|
||||
+ IntStream.range(1, columnSize).mapToObj(i -> "t" + i + " varchar(20)").collect(Collectors.joining(",")) + ", ts timestamp)");
|
||||
for (int i = 1; i <= 10; i++) {
|
||||
String sql = "insert into stb1("
|
||||
+ IntStream.range(1, columnSize).mapToObj(index -> "f" + index).collect(Collectors.joining(",")) + ", "
|
||||
+ IntStream.range(1, columnSize).mapToObj(index -> "t" + index).collect(Collectors.joining(","))
|
||||
+ ", ts) values("
|
||||
+ IntStream.range(1, columnSize).mapToObj(index -> random.nextInt(10) + "").collect(Collectors.joining(","))
|
||||
+ ","
|
||||
+ IntStream.range(1, columnSize).mapToObj(index -> "'" + RandomStringUtils.randomAlphanumeric(15) + "'").collect(Collectors.joining(","))
|
||||
+ ", '" + sdf.format(new Date(ts_start + i * 1000)) + "')";
|
||||
stmt.execute(sql);
|
||||
}
|
||||
|
||||
stmt.close();
|
||||
}
|
||||
|
||||
final String url2 = "jdbc:TAOS://" + host2 + ":6030/";
|
||||
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
|
||||
Statement stmt = conn.createStatement();
|
||||
|
||||
stmt.execute("drop database if exists db2");
|
||||
stmt.execute("create database if not exists db2");
|
||||
stmt.execute("create table db2.stb2(ts timestamp, "
|
||||
+ IntStream.range(1, 101).mapToObj(i -> "f" + i + " int").collect(Collectors.joining(",")) + ") tags("
|
||||
+ IntStream.range(1, 101).mapToObj(i -> "t" + i + " nchar(20)").collect(Collectors.joining(","))
|
||||
+ ")"
|
||||
);
|
||||
|
||||
stmt.close();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
57
tdenginewriter/src/test/resources/m2t-2.json
Normal file
57
tdenginewriter/src/test/resources/m2t-2.json
Normal file
@ -0,0 +1,57 @@
|
||||
{
|
||||
"job": {
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "mysqlreader",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "123456",
|
||||
"splitPk": "id",
|
||||
"connection": [
|
||||
{
|
||||
"querySql": [
|
||||
"select t1 as tbname, ts, f1,f2,f3,f4,t1 from stb1"
|
||||
],
|
||||
"jdbcUrl": [
|
||||
"jdbc:mysql://192.168.56.105:3306/db1?useSSL=false&useUnicode=true&characterEncoding=utf8"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "tdenginewriter",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "taosdata",
|
||||
"column": [
|
||||
"tbname",
|
||||
"ts",
|
||||
"f1",
|
||||
"f2",
|
||||
"f3",
|
||||
"f4",
|
||||
"t1"
|
||||
],
|
||||
"connection": [
|
||||
{
|
||||
"table": [
|
||||
"stb2"
|
||||
],
|
||||
"jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/db2"
|
||||
}
|
||||
],
|
||||
"batchSize": 1000,
|
||||
"ignoreTagsUnmatched": true
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
53
tdenginewriter/src/test/resources/m2t-3.json
Normal file
53
tdenginewriter/src/test/resources/m2t-3.json
Normal file
@ -0,0 +1,53 @@
|
||||
{
|
||||
"job": {
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "mysqlreader",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "123456",
|
||||
"splitPk": "id",
|
||||
"connection": [
|
||||
{
|
||||
"querySql": [
|
||||
"select ts,f1,t1 from stb1"
|
||||
],
|
||||
"jdbcUrl": [
|
||||
"jdbc:mysql://192.168.56.105:3306/db1?useSSL=false&useUnicode=true&characterEncoding=utf8"
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "tdenginewriter",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "taosdata",
|
||||
"column": [
|
||||
"ts",
|
||||
"f1",
|
||||
"t1"
|
||||
],
|
||||
"connection": [
|
||||
{
|
||||
"table": [
|
||||
"stb2"
|
||||
],
|
||||
"jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/db2"
|
||||
}
|
||||
],
|
||||
"batchSize": 1000,
|
||||
"ignoreTagsUnmatched": true
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user