Merge pull request #49 from taosdata/dev/zyyang

fix: get tag value could be null
This commit is contained in:
zyyang 2024-04-19 21:01:16 +08:00 committed by GitHub
commit edbe2e1d6f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
3 changed files with 111 additions and 10 deletions

View File

@ -34,8 +34,18 @@ public class Schema3_0Manager extends SchemaManager {
Map<String, TableMeta> tableMetas = new HashMap<>();
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA
+ Constants.INFORMATION_SCHEMA_TABLE_INS_STABLES + " where db_name = " + getDbnameForSqlQuery());
StringBuilder sb = new StringBuilder();
sb.append("select * from ")
.append(Constants.INFORMATION_SCHEMA)
.append(Constants.INFORMATION_SCHEMA_COMMA)
.append(Constants.INFORMATION_SCHEMA_TABLE_INS_STABLES)
.append(" where db_name = ")
.append(getDbnameForSqlQuery())
.append(" and stable_name in ")
.append(tables.stream().map(t -> "'" + t + "'").collect(Collectors.joining(",", "(", ")")));
ResultSet rs = stmt.executeQuery(sb.toString());
while (rs.next()) {
TableMeta tableMeta = buildSupTableMeta(rs);
if (!tables.contains(tableMeta.tbname))
@ -43,8 +53,17 @@ public class Schema3_0Manager extends SchemaManager {
tableMetas.put(tableMeta.tbname, tableMeta);
}
rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA
+ Constants.INFORMATION_SCHEMA_TABLE_INS_TABLES + " where db_name = " + getDbnameForSqlQuery());
sb = new StringBuilder();
sb.append("select * from ")
.append(Constants.INFORMATION_SCHEMA)
.append(Constants.INFORMATION_SCHEMA_COMMA)
.append(Constants.INFORMATION_SCHEMA_TABLE_INS_TABLES)
.append(" where db_name = ")
.append(getDbnameForSqlQuery())
.append(" and table_name in ")
.append(tables.stream().map(t -> "'" + t + "'").collect(Collectors.joining(",", "(", ")")));
rs = stmt.executeQuery(sb.toString());
while (rs.next()) {
TableMeta tableMeta = buildSubTableMeta(rs);
if (!tables.contains(tableMeta.tbname))
@ -54,7 +73,8 @@ public class Schema3_0Manager extends SchemaManager {
for (String tbname : tables) {
if (!tableMetas.containsKey(tbname)) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "table metadata of " + tbname + " is empty!");
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
"table metadata of " + tbname + " is empty!");
}
}
} catch (SQLException e) {
@ -74,7 +94,7 @@ public class Schema3_0Manager extends SchemaManager {
tableMeta.tbname = rs.getString(Constants.TABLE_META_SUP_TABLE_NAME);
tableMeta.columns = rs.getInt(Constants.TABLE_META_COLUMNS);
tableMeta.tags = rs.getInt(Constants.TABLE_META_TAGS);
// tableMeta.tables = rs.getInt("tables"); // 直接从 ins_stables 查不到子表数量
// tableMeta.tables = rs.getInt("tables"); // 直接从 ins_stables 查不到子表数量
LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
return tableMeta;
}
@ -109,8 +129,10 @@ public class Schema3_0Manager extends SchemaManager {
if (this.precision != null)
return this.precision;
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA
+ Constants.INFORMATION_SCHEMA_TABLE_INS_DATABASES + " where name = " + getDbnameForSqlQuery());
ResultSet rs = stmt.executeQuery(
"select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA +
Constants.INFORMATION_SCHEMA_TABLE_INS_DATABASES + " where name = " +
getDbnameForSqlQuery());
while (rs.next()) {
String precision = rs.getString(Constants.DATABASE_META_PRECISION);
switch (precision) {

View File

@ -114,13 +114,16 @@ public final class SchemaCache {
private Object getTagValue(String tableName, String tagName) {
String sql = "select " + tagName + " from " + tableName;
Object tagValue = null;
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery(sql);
rs.next();
return rs.getObject(tagName);
tagValue = rs.getObject(tagName);
} catch (SQLException e) {
throw DataXException.asDataXException("failed to get tag value, cause: {" + e.getMessage() + "}");
log.error("failed to get tag value, use NULL, cause: {" + e.getMessage() + "}");
}
return tagValue;
}
private ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException {

View File

@ -0,0 +1,76 @@
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"type": "string",
"value": "d1"
},
{
"type": "date",
"value": "2024-04-19 01:02:03.456",
"dateFormat": "yyyy-MM-dd HH:mm:ss.SSS"
},
{
"type": "double",
"value": 220.23
},
{
"type": "long",
"value": 15
},
{
"type": "long",
"value": 90.5
},
{
"type": "long",
"value": 1
},
{
"type": "string",
"value": "北京朝阳望京"
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "tdengine30writer",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"current",
"voltage",
"phase",
"groupid",
"location"
],
"connection": [
{
"table": [
"meters"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.0.201:6041/zyyang"
}
],
"batchSize": 100,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}