mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 20:39:48 +08:00
fix: get tag value could be null
This commit is contained in:
parent
5d747e5033
commit
1d382da3b4
@ -34,8 +34,18 @@ public class Schema3_0Manager extends SchemaManager {
|
||||
Map<String, TableMeta> tableMetas = new HashMap<>();
|
||||
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
ResultSet rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA
|
||||
+ Constants.INFORMATION_SCHEMA_TABLE_INS_STABLES + " where db_name = " + getDbnameForSqlQuery());
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("select * from ")
|
||||
.append(Constants.INFORMATION_SCHEMA)
|
||||
.append(Constants.INFORMATION_SCHEMA_COMMA)
|
||||
.append(Constants.INFORMATION_SCHEMA_TABLE_INS_STABLES)
|
||||
.append(" where db_name = ")
|
||||
.append(getDbnameForSqlQuery())
|
||||
.append(" and stable_name in ")
|
||||
.append(tables.stream().map(t -> "'" + t + "'").collect(Collectors.joining(",", "(", ")")));
|
||||
|
||||
ResultSet rs = stmt.executeQuery(sb.toString());
|
||||
while (rs.next()) {
|
||||
TableMeta tableMeta = buildSupTableMeta(rs);
|
||||
if (!tables.contains(tableMeta.tbname))
|
||||
@ -43,8 +53,17 @@ public class Schema3_0Manager extends SchemaManager {
|
||||
tableMetas.put(tableMeta.tbname, tableMeta);
|
||||
}
|
||||
|
||||
rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA
|
||||
+ Constants.INFORMATION_SCHEMA_TABLE_INS_TABLES + " where db_name = " + getDbnameForSqlQuery());
|
||||
sb = new StringBuilder();
|
||||
sb.append("select * from ")
|
||||
.append(Constants.INFORMATION_SCHEMA)
|
||||
.append(Constants.INFORMATION_SCHEMA_COMMA)
|
||||
.append(Constants.INFORMATION_SCHEMA_TABLE_INS_TABLES)
|
||||
.append(" where db_name = ")
|
||||
.append(getDbnameForSqlQuery())
|
||||
.append(" and table_name in ")
|
||||
.append(tables.stream().map(t -> "'" + t + "'").collect(Collectors.joining(",", "(", ")")));
|
||||
|
||||
rs = stmt.executeQuery(sb.toString());
|
||||
while (rs.next()) {
|
||||
TableMeta tableMeta = buildSubTableMeta(rs);
|
||||
if (!tables.contains(tableMeta.tbname))
|
||||
@ -54,7 +73,8 @@ public class Schema3_0Manager extends SchemaManager {
|
||||
|
||||
for (String tbname : tables) {
|
||||
if (!tableMetas.containsKey(tbname)) {
|
||||
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "table metadata of " + tbname + " is empty!");
|
||||
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
|
||||
"table metadata of " + tbname + " is empty!");
|
||||
}
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
@ -74,7 +94,7 @@ public class Schema3_0Manager extends SchemaManager {
|
||||
tableMeta.tbname = rs.getString(Constants.TABLE_META_SUP_TABLE_NAME);
|
||||
tableMeta.columns = rs.getInt(Constants.TABLE_META_COLUMNS);
|
||||
tableMeta.tags = rs.getInt(Constants.TABLE_META_TAGS);
|
||||
// tableMeta.tables = rs.getInt("tables"); // 直接从 ins_stables 查不到子表数量
|
||||
// tableMeta.tables = rs.getInt("tables"); // 直接从 ins_stables 查不到子表数量
|
||||
LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
|
||||
return tableMeta;
|
||||
}
|
||||
@ -109,8 +129,10 @@ public class Schema3_0Manager extends SchemaManager {
|
||||
if (this.precision != null)
|
||||
return this.precision;
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
ResultSet rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA
|
||||
+ Constants.INFORMATION_SCHEMA_TABLE_INS_DATABASES + " where name = " + getDbnameForSqlQuery());
|
||||
ResultSet rs = stmt.executeQuery(
|
||||
"select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA +
|
||||
Constants.INFORMATION_SCHEMA_TABLE_INS_DATABASES + " where name = " +
|
||||
getDbnameForSqlQuery());
|
||||
while (rs.next()) {
|
||||
String precision = rs.getString(Constants.DATABASE_META_PRECISION);
|
||||
switch (precision) {
|
||||
|
@ -114,13 +114,16 @@ public final class SchemaCache {
|
||||
|
||||
private Object getTagValue(String tableName, String tagName) {
|
||||
String sql = "select " + tagName + " from " + tableName;
|
||||
Object tagValue = null;
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
ResultSet rs = stmt.executeQuery(sql);
|
||||
rs.next();
|
||||
return rs.getObject(tagName);
|
||||
tagValue = rs.getObject(tagName);
|
||||
} catch (SQLException e) {
|
||||
throw DataXException.asDataXException("failed to get tag value, cause: {" + e.getMessage() + "}");
|
||||
log.error("failed to get tag value, use NULL, cause: {" + e.getMessage() + "}");
|
||||
}
|
||||
|
||||
return tagValue;
|
||||
}
|
||||
|
||||
private ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException {
|
||||
|
76
tdengine30writer/src/test/resources/meters.json
Normal file
76
tdengine30writer/src/test/resources/meters.json
Normal file
@ -0,0 +1,76 @@
|
||||
{
|
||||
"job": {
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "streamreader",
|
||||
"parameter": {
|
||||
"column": [
|
||||
{
|
||||
"type": "string",
|
||||
"value": "d1"
|
||||
},
|
||||
{
|
||||
"type": "date",
|
||||
"value": "2024-04-19 01:02:03.456",
|
||||
"dateFormat": "yyyy-MM-dd HH:mm:ss.SSS"
|
||||
},
|
||||
{
|
||||
"type": "double",
|
||||
"value": 220.23
|
||||
},
|
||||
{
|
||||
"type": "long",
|
||||
"value": 15
|
||||
},
|
||||
{
|
||||
"type": "long",
|
||||
"value": 90.5
|
||||
},
|
||||
{
|
||||
"type": "long",
|
||||
"value": 1
|
||||
},
|
||||
{
|
||||
"type": "string",
|
||||
"value": "北京朝阳望京"
|
||||
}
|
||||
],
|
||||
"sliceRecordCount": 10
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "tdengine30writer",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "taosdata",
|
||||
"column": [
|
||||
"tbname",
|
||||
"ts",
|
||||
"current",
|
||||
"voltage",
|
||||
"phase",
|
||||
"groupid",
|
||||
"location"
|
||||
],
|
||||
"connection": [
|
||||
{
|
||||
"table": [
|
||||
"meters"
|
||||
],
|
||||
"jdbcUrl": "jdbc:TAOS-RS://192.168.0.201:6041/zyyang"
|
||||
}
|
||||
],
|
||||
"batchSize": 100,
|
||||
"ignoreTagsUnmatched": true
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user