diff --git a/tdengine30reader/pom.xml b/tdengine30reader/pom.xml
index 2a82fc2b..5754fb2d 100644
--- a/tdengine30reader/pom.xml
+++ b/tdengine30reader/pom.xml
@@ -37,7 +37,7 @@
com.taosdata.jdbc
taos-jdbcdriver
- 3.2.5
+ 3.2.9
junit
diff --git a/tdengine30writer/pom.xml b/tdengine30writer/pom.xml
index 94ef94f9..4b71a1b5 100644
--- a/tdengine30writer/pom.xml
+++ b/tdengine30writer/pom.xml
@@ -22,7 +22,7 @@
com.taosdata.jdbc
taos-jdbcdriver
- 3.2.5
+ 3.2.9
org.apache.commons
diff --git a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandler.java b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandler.java
index 010795ef..3d02df80 100644
--- a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandler.java
+++ b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandler.java
@@ -33,32 +33,25 @@ public class DefaultDataHandler implements DataHandler {
}
private final TaskPluginCollector taskPluginCollector;
- private String username;
- private String password;
- private String jdbcUrl;
- private int batchSize;
- private boolean ignoreTagsUnmatched;
+ private final String username;
+ private final String password;
+ private final String jdbcUrl;
+ private final int batchSize;
+ private final boolean ignoreTagsUnmatched;
- private List tables;
- private List columns;
+ private final List tables;
+ private final List columns;
- private Map tableMetas;
private SchemaManager schemaManager;
+ private final SchemaCache schemaCache;
- public void setTableMetas(Map tableMetas) {
- this.tableMetas = tableMetas;
- }
-
- public void setTbnameColumnMetasMap(Map> tbnameColumnMetasMap) {
- this.tbnameColumnMetasMap = tbnameColumnMetasMap;
- }
+ //private Map tableMetas;
+ private Map> tbnameColumnMetasMap;
public void setSchemaManager(SchemaManager schemaManager) {
this.schemaManager = schemaManager;
}
- private Map> tbnameColumnMetasMap;
-
public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) {
this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD);
@@ -66,8 +59,11 @@ public class DefaultDataHandler implements DataHandler {
this.batchSize = configuration.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE);
this.tables = configuration.getList(Key.TABLE, String.class);
this.columns = configuration.getList(Key.COLUMN, String.class);
- this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED, Constants.DEFAULT_IGNORE_TAGS_UNMATCHED);
+ this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED,
+ Constants.DEFAULT_IGNORE_TAGS_UNMATCHED);
this.taskPluginCollector = taskPluginCollector;
+
+ this.schemaCache = SchemaCache.getInstance(configuration);
}
@Override
@@ -77,14 +73,15 @@ public class DefaultDataHandler implements DataHandler {
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
- String dbname = getDbnameFromJdbcUrl(jdbcUrl);
+ String dbname = TDengineWriter.parseDatabaseFromJdbcUrl(jdbcUrl);
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery("select " + Constants.SERVER_VERSION);
// 光标移动一行
resultSet.next();
String serverVersion = resultSet.getString(Constants.SERVER_VERSION);
if (StringUtils.isEmpty(dbname)) {
- throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "error dbname parsed from jdbcUrl");
+ throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
+ "error dbname parsed from jdbcUrl");
}
LOG.info("tdengine server version[{}] dbname[{}]", serverVersion, dbname);
// 版本判断确定使用 SchemaManager 的示例
@@ -93,21 +90,6 @@ public class DefaultDataHandler implements DataHandler {
} else {
this.schemaManager = new Schema3_0Manager(conn, dbname);
}
- // prepare table_name -> table_meta
- this.tableMetas = schemaManager.loadTableMeta(tables);
- // prepare table_name -> column_meta
- this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables);
- // filter column
- for (String tableName : tbnameColumnMetasMap.keySet()) {
- List columnMetaList = tbnameColumnMetasMap.get(tableName);
- Iterator iterator = columnMetaList.iterator();
- while (iterator.hasNext()) {
- ColumnMeta columnMeta = iterator.next();
- if (!this.columns.contains(columnMeta.field)) {
- iterator.remove();
- }
- }
- }
List recordBatch = new ArrayList<>();
Record record;
@@ -141,26 +123,13 @@ public class DefaultDataHandler implements DataHandler {
}
if (affectedRows != count) {
- LOG.error("write record missing or incorrect happened, affectedRows: " + affectedRows + ", total: " + count);
+ LOG.error(
+ "write record missing or incorrect happened, affectedRows: " + affectedRows + ", total: " + count);
}
return affectedRows;
}
- /**
- * 从 jdbcUrl 中解析出数据库名称
- * @param jdbcUrl 格式是 jdbc:://:/[?可选参数]
- * @return 数据库名称
- */
- private static String getDbnameFromJdbcUrl(String jdbcUrl) {
- int questionMarkIndex = -1;
- if (jdbcUrl.contains("?")) {
- questionMarkIndex = jdbcUrl.indexOf("?");
- }
- return questionMarkIndex == -1 ? jdbcUrl.substring(jdbcUrl.lastIndexOf("/") + 1) :
- jdbcUrl.substring(jdbcUrl.lastIndexOf("/") + 1, questionMarkIndex);
- }
-
private int writeEachRow(Connection conn, List recordBatch) {
int affectedRows = 0;
for (Record record : recordBatch) {
@@ -190,7 +159,7 @@ public class DefaultDataHandler implements DataHandler {
public int writeBatch(Connection conn, List recordBatch) throws SQLException {
int affectedRows = 0;
for (String table : tables) {
- TableMeta tableMeta = tableMetas.get(table);
+ TableMeta tableMeta = this.schemaCache.getTableMeta(table);
switch (tableMeta.tableType) {
case SUP_TABLE: {
if (columns.contains("tbname")) {
@@ -212,17 +181,15 @@ public class DefaultDataHandler implements DataHandler {
return affectedRows;
}
- private int writeBatchToSupTableWithoutTbname(Connection conn, String table, List recordBatch, Map tag2Tbname) throws SQLException {
- List columnMetas = tbnameColumnMetasMap.get(table);
+ private int writeBatchToSupTableWithoutTbname(Connection conn, String table, List recordBatch,
+ Map tag2Tbname) throws SQLException {
+ List columnMetas = schemaCache.getColumnMetaList(table);
List subTableExist = filterSubTableExistRecords(recordBatch, columnMetas, tag2Tbname);
List subTableNotExist = filterSubTableNotExistRecords(recordBatch, columnMetas, tag2Tbname);
int affectedRows = 0;
Map> subTableRecordsMap = splitRecords(subTableExist, columnMetas, tag2Tbname);
- List subTables = new ArrayList<>(subTableRecordsMap.keySet());
- this.tbnameColumnMetasMap.putAll(schemaManager.loadColumnMetas(subTables));
-
for (String subTable : subTableRecordsMap.keySet()) {
List subTableRecords = subTableRecordsMap.get(subTable);
affectedRows += writeBatchToNormalTable(conn, subTable, subTableRecords);
@@ -232,21 +199,24 @@ public class DefaultDataHandler implements DataHandler {
return affectedRows;
}
- private List filterSubTableExistRecords(List recordBatch, List columnMetas, Map tag2Tbname) {
+ private List filterSubTableExistRecords(List recordBatch, List columnMetas,
+ Map tag2Tbname) {
return recordBatch.stream().filter(record -> {
String tagStr = getTagString(columnMetas, record);
return tag2Tbname.containsKey(tagStr);
}).collect(Collectors.toList());
}
- private List filterSubTableNotExistRecords(List recordBatch, List columnMetas, Map tag2Tbname) {
+ private List filterSubTableNotExistRecords(List recordBatch, List columnMetas,
+ Map tag2Tbname) {
return recordBatch.stream().filter(record -> {
String tagStr = getTagString(columnMetas, record);
return !tag2Tbname.containsKey(tagStr);
}).collect(Collectors.toList());
}
- private Map> splitRecords(List subTableExist, List columnMetas, Map tag2Tbname) {
+ private Map> splitRecords(List subTableExist, List columnMetas,
+ Map tag2Tbname) {
Map> ret = new HashMap<>();
for (Record record : subTableExist) {
String tagstr = getTagString(columnMetas, record);
@@ -278,7 +248,9 @@ public class DefaultDataHandler implements DataHandler {
return column.asString();
}
} catch (Exception e) {
- LOG.error("failed to get Tag, colIndex: " + colIndex + ", ColumnMeta: " + columnMeta + ", record: " + record, e);
+ LOG.error(
+ "failed to get Tag, colIndex: " + colIndex + ", ColumnMeta: " + columnMeta + ", record: " +
+ record, e);
}
}
return "";
@@ -291,30 +263,32 @@ public class DefaultDataHandler implements DataHandler {
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
*/
private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) throws SQLException {
- List columnMetas = this.tbnameColumnMetasMap.get(table);
+ List columnMetas = this.schemaCache.getColumnMetaList(table);
StringBuilder sb = new StringBuilder("insert into");
for (Record record : recordBatch) {
- sb.append(" `").append(record.getColumn(indexOf("tbname")).asString())
- .append("` using ").append(table)
- .append(" tags")
- .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
- return colMeta.isTag;
- }).map(colMeta -> {
- return buildColumnValue(colMeta, record);
- }).collect(Collectors.joining(",", "(", ")")))
- .append(" ")
- .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
- return !colMeta.isTag;
- }).map(colMeta -> {
- return colMeta.field;
- }).collect(Collectors.joining(",", "(", ")")))
- .append(" values")
- .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
- return !colMeta.isTag;
- }).map(colMeta -> {
- return buildColumnValue(colMeta, record);
- }).collect(Collectors.joining(",", "(", ")")));
+ sb.append(" `")
+ .append(record.getColumn(indexOf("tbname")).asString())
+ .append("` using ")
+ .append(table)
+ .append(" tags")
+ .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return colMeta.isTag;
+ }).map(colMeta -> {
+ return buildColumnValue(colMeta, record);
+ }).collect(Collectors.joining(",", "(", ")")))
+ .append(" ")
+ .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return !colMeta.isTag;
+ }).map(colMeta -> {
+ return colMeta.field;
+ }).collect(Collectors.joining(",", "(", ")")))
+ .append(" values")
+ .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return !colMeta.isTag;
+ }).map(colMeta -> {
+ return buildColumnValue(colMeta, record);
+ }).collect(Collectors.joining(",", "(", ")")));
}
String sql = sb.toString();
@@ -371,33 +345,35 @@ public class DefaultDataHandler implements DataHandler {
* table: ["stb1"], column: ["ts", "f1", "f2", "t1"]
* data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts
*/
- private int writeBatchToSupTableBySchemaless(Connection conn, String table, List recordBatch) throws SQLException {
+ private int writeBatchToSupTableBySchemaless(Connection conn, String table,
+ List recordBatch) throws SQLException {
int count = 0;
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
- List columnMetaList = this.tbnameColumnMetasMap.get(table);
+ List columnMetaList = this.schemaCache.getColumnMetaList(table);
ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get();
List lines = new ArrayList<>();
for (Record record : recordBatch) {
StringBuilder sb = new StringBuilder();
- sb.append(table).append(",")
- .append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
- return colMeta.isTag;
- }).map(colMeta -> {
- String value = record.getColumn(indexOf(colMeta.field)).asString();
- if (value.contains(" "))
- value = value.replace(" ", "\\ ");
- return colMeta.field + "=" + value;
- }).collect(Collectors.joining(",")))
- .append(" ")
- .append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
- return !colMeta.isTag && !colMeta.isPrimaryKey;
- }).map(colMeta -> {
- return colMeta.field + "=" + buildSchemalessColumnValue(colMeta, record);
-// return colMeta.field + "=" + record.getColumn(indexOf(colMeta.field)).asString();
- }).collect(Collectors.joining(",")))
- .append(" ");
+ sb.append(table)
+ .append(",")
+ .append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return colMeta.isTag;
+ }).map(colMeta -> {
+ String value = record.getColumn(indexOf(colMeta.field)).asString();
+ if (value.contains(" "))
+ value = value.replace(" ", "\\ ");
+ return colMeta.field + "=" + value;
+ }).collect(Collectors.joining(",")))
+ .append(" ")
+ .append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return !colMeta.isTag && !colMeta.isPrimaryKey;
+ }).map(colMeta -> {
+ return colMeta.field + "=" + buildSchemalessColumnValue(colMeta, record);
+ // return colMeta.field + "=" + record.getColumn(indexOf(colMeta.field)).asString();
+ }).collect(Collectors.joining(",")))
+ .append(" ");
// timestamp
Column column = record.getColumn(indexOf(ts.field));
Object tsValue = column.getRawData();
@@ -512,28 +488,33 @@ public class DefaultDataHandler implements DataHandler {
* insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
*/
private int writeBatchToSubTable(Connection conn, String table, List recordBatch) throws SQLException {
- List columnMetas = this.tbnameColumnMetasMap.get(table);
+ List columnMetas = this.schemaCache.getColumnMetaList(table);
StringBuilder sb = new StringBuilder();
- sb.append("insert into `").append(table).append("` ")
- .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
- return !colMeta.isTag;
- }).map(colMeta -> {
- return colMeta.field;
- }).collect(Collectors.joining(",", "(", ")")))
- .append(" values");
+ sb.append("insert into `")
+ .append(table)
+ .append("` ")
+ .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return !colMeta.isTag;
+ }).map(colMeta -> {
+ return colMeta.field;
+ }).collect(Collectors.joining(",", "(", ")")))
+ .append(" values");
int validRecords = 0;
for (Record record : recordBatch) {
if (columns.contains("tbname") && !table.equals(record.getColumn(indexOf("tbname")).asString()))
continue;
- boolean tagsAllMatch = columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
- return colMeta.isTag;
- }).allMatch(colMeta -> {
- Column column = record.getColumn(indexOf(colMeta.field));
- boolean equals = equals(column, colMeta);
- return equals;
- });
+ boolean tagsAllMatch = columnMetas.stream()
+ .filter(colMeta -> columns.contains(colMeta.field))
+ .filter(colMeta -> {
+ return colMeta.isTag;
+ })
+ .allMatch(colMeta -> {
+ Column column = record.getColumn(indexOf(colMeta.field));
+ boolean equals = equals(column, colMeta);
+ return equals;
+ });
if (ignoreTagsUnmatched && !tagsAllMatch)
continue;
@@ -582,20 +563,29 @@ public class DefaultDataHandler implements DataHandler {
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
*/
private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) throws SQLException {
- List columnMetas = this.tbnameColumnMetasMap.get(table);
+ List columnMetas = this.schemaCache.getColumnMetaList(table);
StringBuilder sb = new StringBuilder();
- sb.append("insert into `").append(table)
- .append("` ")
- .append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
- return colMeta.field;
- }).collect(Collectors.joining(",", "(", ")")))
- .append(" values ");
+ sb.append("insert into `")
+ .append(table)
+ .append("` ")
+ .append(columnMetas.stream()
+ .filter(colMeta -> !colMeta.isTag)
+ .filter(colMeta -> columns.contains(colMeta.field))
+ .map(colMeta -> {
+ return colMeta.field;
+ })
+ .collect(Collectors.joining(",", "(", ")")))
+ .append(" values ");
for (Record record : recordBatch) {
- sb.append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
- return buildColumnValue(colMeta, record);
- }).collect(Collectors.joining(",", "(", ")")));
+ sb.append(columnMetas.stream()
+ .filter(colMeta -> !colMeta.isTag)
+ .filter(colMeta -> columns.contains(colMeta.field))
+ .map(colMeta -> {
+ return buildColumnValue(colMeta, record);
+ })
+ .collect(Collectors.joining(",", "(", ")")));
}
String sql = sb.toString();
diff --git a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java
new file mode 100644
index 00000000..ebcd623a
--- /dev/null
+++ b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java
@@ -0,0 +1,138 @@
+package com.alibaba.datax.plugin.writer.tdengine30writer;
+
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.util.Configuration;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.LinkedHashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.function.Predicate;
+
+/**
+ * Schema cache for TDengine 3.X
+ */
+public final class SchemaCache {
+ private static final Logger log = LoggerFactory.getLogger(TDengineWriter.Job.class);
+
+ private static volatile SchemaCache instance;
+
+ private static Configuration config;
+ private static Connection conn;
+ // table name -> TableMeta
+ private static final Map tableMetas = new LinkedHashMap<>();
+ // table name ->List
+ private static final Map> columnMetas = new LinkedHashMap<>();
+
+ private SchemaCache(Configuration config) {
+ SchemaCache.config = config;
+
+ // connect
+ final String user = config.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
+ final String pass = config.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD);
+
+ Configuration conn = Configuration.from(config.getList(Key.CONNECTION).get(0).toString());
+
+ final String url = conn.getString(Key.JDBC_URL);
+ try {
+ SchemaCache.conn = DriverManager.getConnection(url, user, pass);
+ } catch (SQLException e) {
+ throw DataXException.asDataXException(
+ "failed to connect to url: " + url + ", cause: {" + e.getMessage() + "}");
+ }
+
+ final String dbname = TDengineWriter.parseDatabaseFromJdbcUrl(url);
+ SchemaManager schemaManager = new Schema3_0Manager(SchemaCache.conn, dbname);
+
+ // init table meta cache and load
+ final List tables = conn.getList(Key.TABLE, String.class);
+ Map tableMetas = schemaManager.loadTableMeta(tables);
+
+ // init column meta cache
+ SchemaCache.tableMetas.putAll(tableMetas);
+ for (String table : tableMetas.keySet()) {
+ SchemaCache.columnMetas.put(table, new ArrayList<>());
+ }
+ }
+
+ public static SchemaCache getInstance(Configuration originConfig) {
+ if (instance == null) {
+ synchronized (SchemaCache.class) {
+ if (instance == null) {
+ instance = new SchemaCache(originConfig);
+ }
+ }
+ }
+ return instance;
+ }
+
+ public TableMeta getTableMeta(String table_name) {
+ return tableMetas.get(table_name);
+ }
+
+ public List getColumnMetaList(String tbname) {
+ if (columnMetas.get(tbname).isEmpty()) {
+ synchronized (SchemaCache.class) {
+ if (columnMetas.get(tbname).isEmpty()) {
+ List column_name = config.getList(Key.COLUMN, String.class);
+ List colMetaList = getColumnMetaListFromDb(tbname,
+ (colMeta) -> column_name.contains(colMeta.field));
+ columnMetas.get(tbname).addAll(colMetaList);
+ }
+ }
+ }
+ return columnMetas.get(tbname);
+ }
+
+ private List getColumnMetaListFromDb(String tableName, Predicate filter) {
+ List columnMetaList = columnMetas.get(tableName);
+
+ try (Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery("describe " + tableName);
+ for (int i = 0; rs.next(); i++) {
+ ColumnMeta columnMeta = buildColumnMeta(rs, i == 0);
+
+ if (filter.test(columnMeta))
+ columnMetaList.add(columnMeta);
+ }
+ } catch (SQLException e) {
+ throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
+ }
+
+ for (ColumnMeta colMeta : columnMetaList) {
+ if (!colMeta.isTag)
+ continue;
+ Object tagValue = getTagValue(tableName, colMeta.field);
+ colMeta.value = tagValue;
+ }
+
+ return columnMetaList;
+ }
+
+ private Object getTagValue(String tableName, String tagName) {
+ String sql = "select " + tagName + " from " + tableName;
+ try (Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery(sql);
+ rs.next();
+ return rs.getObject(tagName);
+ } catch (SQLException e) {
+ throw DataXException.asDataXException("failed to get tag value, cause: {" + e.getMessage() + "}");
+ }
+ }
+
+ private ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException {
+ ColumnMeta columnMeta = new ColumnMeta();
+ columnMeta.field = rs.getString(Constants.COLUMN_META_FIELD);
+ columnMeta.type = rs.getString(Constants.COLUMN_META_TYPE);
+ columnMeta.length = rs.getInt(Constants.COLUMN_META_LENGTH);
+ columnMeta.note = rs.getString(Constants.COLUMN_META_NOTE);
+ columnMeta.isTag = Constants.COLUMN_META_NOTE_TAG.equals(columnMeta.note);
+ // columnMeta.isPrimaryKey = "ts".equals(columnMeta.field);
+ columnMeta.isPrimaryKey = isPrimaryKey;
+ return columnMeta;
+ }
+
+}
diff --git a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/TDengineWriter.java b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/TDengineWriter.java
index d0dde32b..3a128e7c 100644
--- a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/TDengineWriter.java
+++ b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/TDengineWriter.java
@@ -13,7 +13,6 @@ import java.util.ArrayList;
import java.util.List;
public class TDengineWriter extends Writer {
-
private static final String PEER_PLUGIN_NAME = "peerPluginName";
public static class Job extends Writer.Job {
@@ -29,29 +28,28 @@ public class TDengineWriter extends Writer {
// check username
String user = this.originalConfig.getString(Key.USERNAME);
if (StringUtils.isBlank(user))
- throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
- + Key.USERNAME + "] is not set.");
+ throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.USERNAME + "] is not set.");
// check password
String password = this.originalConfig.getString(Key.PASSWORD);
if (StringUtils.isBlank(password))
- throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
- + Key.PASSWORD + "] is not set.");
+ throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.PASSWORD + "] is not set.");
// check connection
List