From 30b0350ec094fea2611505562c9075545b2597f4 Mon Sep 17 00:00:00 2001 From: zyyang Date: Thu, 18 Aug 2022 10:09:17 +0800 Subject: [PATCH] fix: null pointer exception since column size less then columnMeta. #TS-1784 --- .../tdenginewriter/DefaultDataHandler.java | 42 ++++++---- .../tdenginewriter/Mysql2TDengineTest.java | 11 ++- .../tdenginewriter/Mysql2TDengineTest2.java | 67 ++++++++++++++++ .../tdenginewriter/Mysql2TDengineTest3.java | 76 +++++++++++++++++++ tdenginewriter/src/test/resources/m2t-2.json | 57 ++++++++++++++ tdenginewriter/src/test/resources/m2t-3.json | 53 +++++++++++++ 6 files changed, 291 insertions(+), 15 deletions(-) create mode 100644 tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest2.java create mode 100644 tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest3.java create mode 100644 tdenginewriter/src/test/resources/m2t-2.json create mode 100644 tdenginewriter/src/test/resources/m2t-3.json diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index 27ade382..849486fd 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -82,6 +82,18 @@ public class DefaultDataHandler implements DataHandler { // prepare table_name -> column_meta this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables); + // filter column + for (String tableName : tbnameColumnMetasMap.keySet()) { + List columnMetaList = tbnameColumnMetasMap.get(tableName); + Iterator iterator = columnMetaList.iterator(); + while (iterator.hasNext()) { + ColumnMeta columnMeta = iterator.next(); + if (!this.columns.contains(columnMeta.field)) { + iterator.remove(); + } + } + } + List recordBatch = new ArrayList<>(); Record record; for (int i = 1; (record = lineReceiver.getFromReader()) != null; i++) { @@ -226,14 +238,18 @@ public class DefaultDataHandler implements DataHandler { ColumnMeta columnMeta = columnMetas.get(colIndex); if (columnMeta.isTag) { Column column = record.getColumn(colIndex); - switch (columnMeta.type) { - case "TINYINT": - case "SMALLINT": - case "INT": - case "BIGINT": - return column.asLong().toString(); - default: - return column.asString(); + try { + switch (columnMeta.type) { + case "TINYINT": + case "SMALLINT": + case "INT": + case "BIGINT": + return column.asLong().toString(); + default: + return column.asString(); + } + } catch (Exception e) { + LOG.error("failed to get Tag, colIndex: " + colIndex + ", ColumnMeta: " + columnMeta + ", record: " + record, e); } } return ""; @@ -250,8 +266,8 @@ public class DefaultDataHandler implements DataHandler { StringBuilder sb = new StringBuilder("insert into"); for (Record record : recordBatch) { - sb.append(" ").append(record.getColumn(indexOf("tbname")).asString()) - .append(" using ").append(table) + sb.append(" `").append(record.getColumn(indexOf("tbname")).asString()) + .append("` using ").append(table) .append(" tags") .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { return colMeta.isTag; @@ -470,7 +486,7 @@ public class DefaultDataHandler implements DataHandler { List columnMetas = this.tbnameColumnMetasMap.get(table); StringBuilder sb = new StringBuilder(); - sb.append("insert into ").append(table).append(" ") + sb.append("insert into `").append(table).append("` ") .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { return !colMeta.isTag; }).map(colMeta -> { @@ -540,8 +556,8 @@ public class DefaultDataHandler implements DataHandler { List columnMetas = this.tbnameColumnMetasMap.get(table); StringBuilder sb = new StringBuilder(); - sb.append("insert into ").append(table) - .append(" ") + sb.append("insert into `").append(table) + .append("` ") .append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> { return colMeta.field; }).collect(Collectors.joining(",", "(", ")"))) diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java index 4a662711..8bef9c3c 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest.java @@ -11,7 +11,7 @@ import java.util.Random; public class Mysql2TDengineTest { private static final String host1 = "192.168.56.105"; - private static final String host2 = "192.168.1.93"; + private static final String host2 = "192.168.56.105"; private static final Random random = new Random(System.currentTimeMillis()); @Test @@ -21,6 +21,13 @@ public class Mysql2TDengineTest { Engine.entry(params); } + @Test + public void test2() throws Throwable { + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/m2t-2.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + @Before public void before() throws SQLException { SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); @@ -50,7 +57,7 @@ public class Mysql2TDengineTest { stmt.close(); } - final String url2 = "jdbc:TAOS-RS://" + host2 + ":6041/"; + final String url2 = "jdbc:TAOS://" + host2 + ":6030/"; try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) { Statement stmt = conn.createStatement(); diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest2.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest2.java new file mode 100644 index 00000000..98eae4f5 --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest2.java @@ -0,0 +1,67 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.core.Engine; +import org.apache.commons.lang3.RandomStringUtils; +import org.apache.commons.lang3.StringUtils; +import org.junit.Before; +import org.junit.Test; + +import java.sql.*; +import java.text.SimpleDateFormat; +import java.util.Random; + +public class Mysql2TDengineTest2 { + + private static final String host1 = "192.168.56.105"; + private static final String host2 = "192.168.56.105"; + private static final Random random = new Random(System.currentTimeMillis()); + + @Test + public void test2() throws Throwable { + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/m2t-2.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + @Before + public void before() throws SQLException { + final String[] tagList = {"北京", "海淀", "上海", "河北", "天津"}; + + SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + String ts = sdf.format(new Date(System.currentTimeMillis())); + + final String url = "jdbc:mysql://" + host1 + ":3306/?useSSL=false&useUnicode=true&charset=UTF-8&generateSimpleParameterMetadata=true"; + try (Connection conn = DriverManager.getConnection(url, "root", "123456")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists db1"); + stmt.execute("create database if not exists db1"); + stmt.execute("use db1"); + stmt.execute("create table stb1(id int primary key AUTO_INCREMENT, " + + "f1 int, f2 float, f3 double, f4 varchar(100), t1 varchar(100), ts timestamp)"); + for (int i = 1; i <= 10; i++) { + String sql = "insert into stb1(f1, f2, f3, f4, t1, ts) values(" + + random.nextInt(100) + "," + random.nextFloat() * 100 + "," + random.nextDouble() * 100 + + ",'" + RandomStringUtils.randomAlphanumeric(10) + + "', '" + tagList[random.nextInt(tagList.length)] + + "', '" + (ts + i * 1000) + "')"; + stmt.execute(sql); + } + + stmt.close(); + } + + final String url2 = "jdbc:TAOS://" + host2 + ":6030/"; + try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists db2"); + stmt.execute("create database if not exists db2"); + stmt.execute("create table db2.stb2(ts timestamp, f1 int, f2 float, f3 double, f4 nchar(100)) tags(t1 nchar(100))"); + + stmt.close(); + } + + } + +} diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest3.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest3.java new file mode 100644 index 00000000..7061445b --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Mysql2TDengineTest3.java @@ -0,0 +1,76 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.core.Engine; +import org.apache.commons.lang3.RandomStringUtils; +import org.junit.Before; +import org.junit.Test; + +import java.sql.*; +import java.text.SimpleDateFormat; +import java.util.Random; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class Mysql2TDengineTest3 { + + private static final String host1 = "192.168.56.105"; + private static final String host2 = "192.168.56.105"; + private static final Random random = new Random(System.currentTimeMillis()); + + @Test + public void test2() throws Throwable { + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/m2t-3.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + @Before + public void before() throws SQLException { + // given + long ts_start = new Date(System.currentTimeMillis()).getTime(); + final int columnSize = 10; + final SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS"); + + final String url = "jdbc:mysql://" + host1 + ":3306/?useSSL=false&useUnicode=true&charset=UTF-8&generateSimpleParameterMetadata=true"; + try (Connection conn = DriverManager.getConnection(url, "root", "123456")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists db1"); + stmt.execute("create database if not exists db1"); + stmt.execute("use db1"); + stmt.execute("create table stb1(id int primary key AUTO_INCREMENT, " + + IntStream.range(1, columnSize).mapToObj(i -> "f" + i + " int").collect(Collectors.joining(",")) + ", " + + IntStream.range(1, columnSize).mapToObj(i -> "t" + i + " varchar(20)").collect(Collectors.joining(",")) + ", ts timestamp)"); + for (int i = 1; i <= 10; i++) { + String sql = "insert into stb1(" + + IntStream.range(1, columnSize).mapToObj(index -> "f" + index).collect(Collectors.joining(",")) + ", " + + IntStream.range(1, columnSize).mapToObj(index -> "t" + index).collect(Collectors.joining(",")) + + ", ts) values(" + + IntStream.range(1, columnSize).mapToObj(index -> random.nextInt(10) + "").collect(Collectors.joining(",")) + + "," + + IntStream.range(1, columnSize).mapToObj(index -> "'" + RandomStringUtils.randomAlphanumeric(15) + "'").collect(Collectors.joining(",")) + + ", '" + sdf.format(new Date(ts_start + i * 1000)) + "')"; + stmt.execute(sql); + } + + stmt.close(); + } + + final String url2 = "jdbc:TAOS://" + host2 + ":6030/"; + try (Connection conn = DriverManager.getConnection(url2, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists db2"); + stmt.execute("create database if not exists db2"); + stmt.execute("create table db2.stb2(ts timestamp, " + + IntStream.range(1, 101).mapToObj(i -> "f" + i + " int").collect(Collectors.joining(",")) + ") tags(" + + IntStream.range(1, 101).mapToObj(i -> "t" + i + " nchar(20)").collect(Collectors.joining(",")) + + ")" + ); + + stmt.close(); + } + } + + +} diff --git a/tdenginewriter/src/test/resources/m2t-2.json b/tdenginewriter/src/test/resources/m2t-2.json new file mode 100644 index 00000000..3fd3496c --- /dev/null +++ b/tdenginewriter/src/test/resources/m2t-2.json @@ -0,0 +1,57 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "root", + "password": "123456", + "splitPk": "id", + "connection": [ + { + "querySql": [ + "select t1 as tbname, ts, f1,f2,f3,f4,t1 from stb1" + ], + "jdbcUrl": [ + "jdbc:mysql://192.168.56.105:3306/db1?useSSL=false&useUnicode=true&characterEncoding=utf8" + ] + } + ] + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "tbname", + "ts", + "f1", + "f2", + "f3", + "f4", + "t1" + ], + "connection": [ + { + "table": [ + "stb2" + ], + "jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/db2" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/m2t-3.json b/tdenginewriter/src/test/resources/m2t-3.json new file mode 100644 index 00000000..0164bd85 --- /dev/null +++ b/tdenginewriter/src/test/resources/m2t-3.json @@ -0,0 +1,53 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "mysqlreader", + "parameter": { + "username": "root", + "password": "123456", + "splitPk": "id", + "connection": [ + { + "querySql": [ + "select ts,f1,t1 from stb1" + ], + "jdbcUrl": [ + "jdbc:mysql://192.168.56.105:3306/db1?useSSL=false&useUnicode=true&characterEncoding=utf8" + ] + } + ] + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "ts", + "f1", + "t1" + ], + "connection": [ + { + "table": [ + "stb2" + ], + "jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/db2" + } + ], + "batchSize": 1000, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file