From 1d382da3b4985e501b228685541c1cdcc8e1206f Mon Sep 17 00:00:00 2001 From: zyyang Date: Fri, 19 Apr 2024 20:59:51 +0800 Subject: [PATCH] fix: get tag value could be null --- .../tdengine30writer/Schema3_0Manager.java | 38 ++++++++-- .../writer/tdengine30writer/SchemaCache.java | 7 +- .../src/test/resources/meters.json | 76 +++++++++++++++++++ 3 files changed, 111 insertions(+), 10 deletions(-) create mode 100644 tdengine30writer/src/test/resources/meters.json diff --git a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/Schema3_0Manager.java b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/Schema3_0Manager.java index d3e2ade8..221f9ed9 100644 --- a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/Schema3_0Manager.java +++ b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/Schema3_0Manager.java @@ -34,8 +34,18 @@ public class Schema3_0Manager extends SchemaManager { Map tableMetas = new HashMap<>(); try (Statement stmt = conn.createStatement()) { - ResultSet rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA - + Constants.INFORMATION_SCHEMA_TABLE_INS_STABLES + " where db_name = " + getDbnameForSqlQuery()); + + StringBuilder sb = new StringBuilder(); + sb.append("select * from ") + .append(Constants.INFORMATION_SCHEMA) + .append(Constants.INFORMATION_SCHEMA_COMMA) + .append(Constants.INFORMATION_SCHEMA_TABLE_INS_STABLES) + .append(" where db_name = ") + .append(getDbnameForSqlQuery()) + .append(" and stable_name in ") + .append(tables.stream().map(t -> "'" + t + "'").collect(Collectors.joining(",", "(", ")"))); + + ResultSet rs = stmt.executeQuery(sb.toString()); while (rs.next()) { TableMeta tableMeta = buildSupTableMeta(rs); if (!tables.contains(tableMeta.tbname)) @@ -43,8 +53,17 @@ public class Schema3_0Manager extends SchemaManager { tableMetas.put(tableMeta.tbname, tableMeta); } - rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA - + Constants.INFORMATION_SCHEMA_TABLE_INS_TABLES + " where db_name = " + getDbnameForSqlQuery()); + sb = new StringBuilder(); + sb.append("select * from ") + .append(Constants.INFORMATION_SCHEMA) + .append(Constants.INFORMATION_SCHEMA_COMMA) + .append(Constants.INFORMATION_SCHEMA_TABLE_INS_TABLES) + .append(" where db_name = ") + .append(getDbnameForSqlQuery()) + .append(" and table_name in ") + .append(tables.stream().map(t -> "'" + t + "'").collect(Collectors.joining(",", "(", ")"))); + + rs = stmt.executeQuery(sb.toString()); while (rs.next()) { TableMeta tableMeta = buildSubTableMeta(rs); if (!tables.contains(tableMeta.tbname)) @@ -54,7 +73,8 @@ public class Schema3_0Manager extends SchemaManager { for (String tbname : tables) { if (!tableMetas.containsKey(tbname)) { - throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "table metadata of " + tbname + " is empty!"); + throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, + "table metadata of " + tbname + " is empty!"); } } } catch (SQLException e) { @@ -74,7 +94,7 @@ public class Schema3_0Manager extends SchemaManager { tableMeta.tbname = rs.getString(Constants.TABLE_META_SUP_TABLE_NAME); tableMeta.columns = rs.getInt(Constants.TABLE_META_COLUMNS); tableMeta.tags = rs.getInt(Constants.TABLE_META_TAGS); -// tableMeta.tables = rs.getInt("tables"); // 直接从 ins_stables 查不到子表数量 + // tableMeta.tables = rs.getInt("tables"); // 直接从 ins_stables 查不到子表数量 LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta); return tableMeta; } @@ -109,8 +129,10 @@ public class Schema3_0Manager extends SchemaManager { if (this.precision != null) return this.precision; try (Statement stmt = conn.createStatement()) { - ResultSet rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA - + Constants.INFORMATION_SCHEMA_TABLE_INS_DATABASES + " where name = " + getDbnameForSqlQuery()); + ResultSet rs = stmt.executeQuery( + "select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA + + Constants.INFORMATION_SCHEMA_TABLE_INS_DATABASES + " where name = " + + getDbnameForSqlQuery()); while (rs.next()) { String precision = rs.getString(Constants.DATABASE_META_PRECISION); switch (precision) { diff --git a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java index ebcd623a..852f7fd8 100644 --- a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java +++ b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java @@ -114,13 +114,16 @@ public final class SchemaCache { private Object getTagValue(String tableName, String tagName) { String sql = "select " + tagName + " from " + tableName; + Object tagValue = null; try (Statement stmt = conn.createStatement()) { ResultSet rs = stmt.executeQuery(sql); rs.next(); - return rs.getObject(tagName); + tagValue = rs.getObject(tagName); } catch (SQLException e) { - throw DataXException.asDataXException("failed to get tag value, cause: {" + e.getMessage() + "}"); + log.error("failed to get tag value, use NULL, cause: {" + e.getMessage() + "}"); } + + return tagValue; } private ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException { diff --git a/tdengine30writer/src/test/resources/meters.json b/tdengine30writer/src/test/resources/meters.json new file mode 100644 index 00000000..fc71d757 --- /dev/null +++ b/tdengine30writer/src/test/resources/meters.json @@ -0,0 +1,76 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "column": [ + { + "type": "string", + "value": "d1" + }, + { + "type": "date", + "value": "2024-04-19 01:02:03.456", + "dateFormat": "yyyy-MM-dd HH:mm:ss.SSS" + }, + { + "type": "double", + "value": 220.23 + }, + { + "type": "long", + "value": 15 + }, + { + "type": "long", + "value": 90.5 + }, + { + "type": "long", + "value": 1 + }, + { + "type": "string", + "value": "北京朝阳望京" + } + ], + "sliceRecordCount": 10 + } + }, + "writer": { + "name": "tdengine30writer", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "tbname", + "ts", + "current", + "voltage", + "phase", + "groupid", + "location" + ], + "connection": [ + { + "table": [ + "meters" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.0.201:6041/zyyang" + } + ], + "batchSize": 100, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file