fix: null pointer exception since column size less then columnMeta. #TS-1784

This commit is contained in:
zyyang 2022-08-18 10:09:17 +08:00
parent 50d30d941c
commit 30b0350ec0
6 changed files with 291 additions and 15 deletions

View File

@ -82,6 +82,18 @@ public class DefaultDataHandler implements DataHandler {
// prepare table_name -> column_meta
this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables);
// filter column
for (String tableName : tbnameColumnMetasMap.keySet()) {
List<ColumnMeta> columnMetaList = tbnameColumnMetasMap.get(tableName);
Iterator<ColumnMeta> iterator = columnMetaList.iterator();
while (iterator.hasNext()) {
ColumnMeta columnMeta = iterator.next();
if (!this.columns.contains(columnMeta.field)) {
iterator.remove();
}
}
}
List<Record> recordBatch = new ArrayList<>();
Record record;
for (int i = 1; (record = lineReceiver.getFromReader()) != null; i++) {
@ -226,14 +238,18 @@ public class DefaultDataHandler implements DataHandler {
ColumnMeta columnMeta = columnMetas.get(colIndex);
if (columnMeta.isTag) {
Column column = record.getColumn(colIndex);
switch (columnMeta.type) {
case "TINYINT":
case "SMALLINT":
case "INT":
case "BIGINT":
return column.asLong().toString();
default:
return column.asString();
try {
switch (columnMeta.type) {
case "TINYINT":
case "SMALLINT":
case "INT":
case "BIGINT":
return column.asLong().toString();
default:
return column.asString();
}
} catch (Exception e) {
LOG.error("failed to get Tag, colIndex: " + colIndex + ", ColumnMeta: " + columnMeta + ", record: " + record, e);
}
}
return "";
@ -250,8 +266,8 @@ public class DefaultDataHandler implements DataHandler {
StringBuilder sb = new StringBuilder("insert into");
for (Record record : recordBatch) {
sb.append(" ").append(record.getColumn(indexOf("tbname")).asString())
.append(" using ").append(table)
sb.append(" `").append(record.getColumn(indexOf("tbname")).asString())
.append("` using ").append(table)
.append(" tags")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return colMeta.isTag;
@ -470,7 +486,7 @@ public class DefaultDataHandler implements DataHandler {
List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
StringBuilder sb = new StringBuilder();
sb.append("insert into ").append(table).append(" ")
sb.append("insert into `").append(table).append("` ")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
@ -540,8 +556,8 @@ public class DefaultDataHandler implements DataHandler {
List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
StringBuilder sb = new StringBuilder();
sb.append("insert into ").append(table)
.append(" ")
sb.append("insert into `").append(table)
.append("` ")
.append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
return colMeta.field;
}).collect(Collectors.joining(",", "(", ")")))

View File

@ -11,7 +11,7 @@ import java.util.Random;
public class Mysql2TDengineTest {
private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.1.93";
private static final String host2 = "192.168.56.105";
private static final Random random = new Random(System.currentTimeMillis());
@Test
@ -21,6 +21,13 @@ public class Mysql2TDengineTest {
Engine.entry(params);
}
@Test
public void test2() throws Throwable {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/m2t-2.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Before
public void before() throws SQLException {
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
@ -50,7 +57,7 @@ public class Mysql2TDengineTest {
stmt.close();
}
final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041/";
final String url2 = "jdbc:TAOS://" + host2 + ":6030/";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();

View File

@ -0,0 +1,67 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.StringUtils;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Random;
public class Mysql2TDengineTest2 {
private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.56.105";
private static final Random random = new Random(System.currentTimeMillis());
@Test
public void test2() throws Throwable {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/m2t-2.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Before
public void before() throws SQLException {
final String[] tagList = {"北京", "海淀", "上海", "河北", "天津"};
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
String ts = sdf.format(new Date(System.currentTimeMillis()));
final String url = "jdbc:mysql://" + host1 + ":3306/?useSSL=false&useUnicode=true&charset=UTF-8&generateSimpleParameterMetadata=true";
try (Connection conn = DriverManager.getConnection(url, "root", "123456")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db1");
stmt.execute("create database if not exists db1");
stmt.execute("use db1");
stmt.execute("create table stb1(id int primary key AUTO_INCREMENT, " +
"f1 int, f2 float, f3 double, f4 varchar(100), t1 varchar(100), ts timestamp)");
for (int i = 1; i <= 10; i++) {
String sql = "insert into stb1(f1, f2, f3, f4, t1, ts) values("
+ random.nextInt(100) + "," + random.nextFloat() * 100 + "," + random.nextDouble() * 100
+ ",'" + RandomStringUtils.randomAlphanumeric(10)
+ "', '" + tagList[random.nextInt(tagList.length)]
+ "', '" + (ts + i * 1000) + "')";
stmt.execute(sql);
}
stmt.close();
}
final String url2 = "jdbc:TAOS://" + host2 + ":6030/";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.stb2(ts timestamp, f1 int, f2 float, f3 double, f4 nchar(100)) tags(t1 nchar(100))");
stmt.close();
}
}
}

View File

@ -0,0 +1,76 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.Before;
import org.junit.Test;
import java.sql.*;
import java.text.SimpleDateFormat;
import java.util.Random;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class Mysql2TDengineTest3 {
private static final String host1 = "192.168.56.105";
private static final String host2 = "192.168.56.105";
private static final Random random = new Random(System.currentTimeMillis());
@Test
public void test2() throws Throwable {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/m2t-3.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
@Before
public void before() throws SQLException {
// given
long ts_start = new Date(System.currentTimeMillis()).getTime();
final int columnSize = 10;
final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
final String url = "jdbc:mysql://" + host1 + ":3306/?useSSL=false&useUnicode=true&charset=UTF-8&generateSimpleParameterMetadata=true";
try (Connection conn = DriverManager.getConnection(url, "root", "123456")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db1");
stmt.execute("create database if not exists db1");
stmt.execute("use db1");
stmt.execute("create table stb1(id int primary key AUTO_INCREMENT, "
+ IntStream.range(1, columnSize).mapToObj(i -> "f" + i + " int").collect(Collectors.joining(",")) + ", "
+ IntStream.range(1, columnSize).mapToObj(i -> "t" + i + " varchar(20)").collect(Collectors.joining(",")) + ", ts timestamp)");
for (int i = 1; i <= 10; i++) {
String sql = "insert into stb1("
+ IntStream.range(1, columnSize).mapToObj(index -> "f" + index).collect(Collectors.joining(",")) + ", "
+ IntStream.range(1, columnSize).mapToObj(index -> "t" + index).collect(Collectors.joining(","))
+ ", ts) values("
+ IntStream.range(1, columnSize).mapToObj(index -> random.nextInt(10) + "").collect(Collectors.joining(","))
+ ","
+ IntStream.range(1, columnSize).mapToObj(index -> "'" + RandomStringUtils.randomAlphanumeric(15) + "'").collect(Collectors.joining(","))
+ ", '" + sdf.format(new Date(ts_start + i * 1000)) + "')";
stmt.execute(sql);
}
stmt.close();
}
final String url2 = "jdbc:TAOS://" + host2 + ":6030/";
try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists db2");
stmt.execute("create database if not exists db2");
stmt.execute("create table db2.stb2(ts timestamp, "
+ IntStream.range(1, 101).mapToObj(i -> "f" + i + " int").collect(Collectors.joining(",")) + ") tags("
+ IntStream.range(1, 101).mapToObj(i -> "t" + i + " nchar(20)").collect(Collectors.joining(","))
+ ")"
);
stmt.close();
}
}
}

View File

@ -0,0 +1,57 @@
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"splitPk": "id",
"connection": [
{
"querySql": [
"select t1 as tbname, ts, f1,f2,f3,f4,t1 from stb1"
],
"jdbcUrl": [
"jdbc:mysql://192.168.56.105:3306/db1?useSSL=false&useUnicode=true&characterEncoding=utf8"
]
}
]
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"f1",
"f2",
"f3",
"f4",
"t1"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,53 @@
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"splitPk": "id",
"connection": [
{
"querySql": [
"select ts,f1,t1 from stb1"
],
"jdbcUrl": [
"jdbc:mysql://192.168.56.105:3306/db1?useSSL=false&useUnicode=true&characterEncoding=utf8"
]
}
]
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"t1"
],
"connection": [
{
"table": [
"stb2"
],
"jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/db2"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}