diff --git a/tdenginereader/pom.xml b/tdenginereader/pom.xml
index 8c0f6645..ca444bce 100644
--- a/tdenginereader/pom.xml
+++ b/tdenginereader/pom.xml
@@ -32,7 +32,7 @@
com.taosdata.jdbc
taos-jdbcdriver
- 2.0.34
+ 2.0.37
diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/Key.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/Key.java
index eddf98eb..95b55386 100644
--- a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/Key.java
+++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/Key.java
@@ -7,7 +7,7 @@ public class Key {
// public static final String PORT = "port";
// public static final String DB = "db";
public static final String TABLE = "table";
- public static final String USER = "user";
+ public static final String USER = "username";
public static final String PASSWORD = "password";
public static final String CONNECTION = "connection";
// public static final String SQL = "sql";
diff --git a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java
index dfdce7b4..f00d879f 100644
--- a/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java
+++ b/tdenginereader/src/main/java/com/alibaba/datax/plugin/reader/TDengineReader.java
@@ -190,8 +190,9 @@ public class TDengineReader extends Reader {
@Override
public void startRead(RecordSender recordSender) {
try (Statement stmt = conn.createStatement()) {
- for (int i = 0; i < tables.size(); i++) {
- String sql = "select " + StringUtils.join(columns, ",") + " from " + tables.get(i) + " where _c0 >= " + startTime + " and _c0 < " + endTime;
+ for (String table : tables) {
+ String sql = "select " + StringUtils.join(columns, ",") + " from " + table
+ + " where _c0 >= " + startTime + " and _c0 < " + endTime;
ResultSet rs = stmt.executeQuery(sql);
ResultSetMetaData metaData = rs.getMetaData();
while (rs.next()) {
diff --git a/tdenginewriter/pom.xml b/tdenginewriter/pom.xml
index 054f2ef8..720ecbb8 100644
--- a/tdenginewriter/pom.xml
+++ b/tdenginewriter/pom.xml
@@ -19,11 +19,25 @@
+
+
+ com.alibaba
+ fastjson
+ 1.2.58
+
+
com.taosdata.jdbc
taos-jdbcdriver
- 2.0.34
+ 2.0.37
+
+
+ com.alibaba
+ fastjson
+
+
+
com.alibaba.datax
datax-common
@@ -36,18 +50,18 @@
-
- com.taosdata.jdbc
- taos-jdbcdriver
- 2.0.34
-
-
junit
junit
${junit-version}
test
+
+ com.alibaba.datax
+ datax-core
+ 0.0.1-SNAPSHOT
+ test
+
org.apache.commons
commons-lang3
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/ColumnMeta.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/ColumnMeta.java
new file mode 100644
index 00000000..5c77eccd
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/ColumnMeta.java
@@ -0,0 +1,24 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+public class ColumnMeta {
+ String field;
+ String type;
+ int length;
+ String note;
+ boolean isTag;
+ boolean isPrimaryKey;
+ Object value;
+
+ @Override
+ public String toString() {
+ return "ColumnMeta{" +
+ "field='" + field + '\'' +
+ ", type='" + type + '\'' +
+ ", length=" + length +
+ ", note='" + note + '\'' +
+ ", isTag=" + isTag +
+ ", isPrimaryKey=" + isPrimaryKey +
+ ", value=" + value +
+ '}';
+ }
+}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java
new file mode 100644
index 00000000..d62c8f32
--- /dev/null
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Constants.java
@@ -0,0 +1,8 @@
+package com.alibaba.datax.plugin.writer.tdenginewriter;
+
+public class Constants {
+ public static final String DEFAULT_USERNAME = "root";
+ public static final String DEFAULT_PASSWORD = "taosdata";
+ public static final int DEFAULT_BATCH_SIZE = 1;
+ public static final boolean DEFAULT_IGNORE_TAGS_UNMATCHED = false;
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java
index dcc3ca8c..7afcb080 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandler.java
@@ -1,12 +1,9 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.plugin.RecordReceiver;
-
import com.alibaba.datax.common.plugin.TaskPluginCollector;
-import java.util.Properties;
-
public interface DataHandler {
- long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector);
+ int handle(RecordReceiver lineReceiver, TaskPluginCollector collector);
}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java
deleted file mode 100644
index 1f740d7e..00000000
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DataHandlerFactory.java
+++ /dev/null
@@ -1,10 +0,0 @@
-package com.alibaba.datax.plugin.writer.tdenginewriter;
-
-public class DataHandlerFactory {
-
- public static DataHandler build(String peerPluginName) {
- if (peerPluginName.equals("opentsdbreader"))
- return new OpentsdbDataHandler();
- return new DefaultDataHandler();
- }
-}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
index a8704f24..16be4c90 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java
@@ -1,108 +1,423 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
+import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
-import com.taosdata.jdbc.TSDBDriver;
-import com.taosdata.jdbc.TSDBPreparedStatement;
+import com.alibaba.datax.common.util.Configuration;
+import com.taosdata.jdbc.SchemalessWriter;
+import com.taosdata.jdbc.enums.SchemalessProtocolType;
+import com.taosdata.jdbc.enums.SchemalessTimestampType;
+import com.taosdata.jdbc.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.sql.SQLException;
-import java.util.Properties;
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
-/**
- * 默认DataHandler
- */
public class DefaultDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
- static {
- try {
- Class.forName("com.taosdata.jdbc.TSDBDriver");
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
+ private String username;
+ private String password;
+ private String jdbcUrl;
+ private int batchSize;
+ private boolean ignoreTagsUnmatched;
+
+ private List tables;
+ private List columns;
+
+ private Map tableMetas;
+ private SchemaManager schemaManager;
+
+ public void setTableMetas(Map tableMetas) {
+ this.tableMetas = tableMetas;
+ }
+
+ public void setColumnMetas(Map> columnMetas) {
+ this.columnMetas = columnMetas;
+ }
+
+ public void setSchemaManager(SchemaManager schemaManager) {
+ this.schemaManager = schemaManager;
+ }
+
+ private Map> columnMetas;
+
+ public DefaultDataHandler(Configuration configuration) {
+ this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
+ this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD);
+ this.jdbcUrl = configuration.getString(Key.JDBC_URL);
+ this.batchSize = configuration.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE);
+ this.tables = configuration.getList(Key.TABLE, String.class);
+ this.columns = configuration.getList(Key.COLUMN, String.class);
+ this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED, Constants.DEFAULT_IGNORE_TAGS_UNMATCHED);
}
@Override
- public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) {
- SchemaManager schemaManager = new SchemaManager(properties);
- if (!schemaManager.configValid()) {
- return 0;
- }
+ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) {
+ int count = 0;
+ int affectedRows = 0;
- try {
- Connection conn = getTaosConnection(properties);
- if (conn == null) {
- return 0;
- }
- if (schemaManager.shouldGuessSchema()) {
- // 无法从配置文件获取表结构信息,尝试从数据库获取
- LOG.info(Msg.get("try_get_schema_from_db"));
- boolean success = schemaManager.getFromDB(conn);
- if (!success) {
- return 0;
+ try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
+ LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
+ // prepare table_name -> table_meta
+ this.schemaManager = new SchemaManager(conn);
+ this.tableMetas = schemaManager.loadTableMeta(tables);
+ // prepare table_name -> column_meta
+ this.columnMetas = schemaManager.loadColumnMetas(tables);
+
+ List recordBatch = new ArrayList<>();
+ Record record;
+ for (int i = 1; (record = lineReceiver.getFromReader()) != null; i++) {
+ if (i % batchSize != 0) {
+ recordBatch.add(record);
+ } else {
+ affectedRows = writeBatch(conn, recordBatch);
+ recordBatch.clear();
}
- } else {
-
+ count++;
}
- int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, "1000"));
- if (batchSize < 5) {
- // batchSize太小,会增加自动类型推断错误的概率,建议改大后重试
- LOG.error(Msg.get("batch_size_too_small"));
- return 0;
- }
- return write(lineReceiver, conn, batchSize, schemaManager, collector);
- } catch (Exception e) {
- LOG.error("write failed " + e.getMessage());
- e.printStackTrace();
+ } catch (SQLException e) {
+ throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
- return 0;
- }
-
- private Connection getTaosConnection(Properties properties) throws SQLException {
- // 检查必要参数
- String host = properties.getProperty(Key.HOST);
- String port = properties.getProperty(Key.PORT);
- String dbname = properties.getProperty(Key.DBNAME);
- String user = properties.getProperty(Key.USER);
- String password = properties.getProperty(Key.PASSWORD);
- if (host == null || port == null || dbname == null || user == null || password == null) {
- String keys = String.join(" ", Key.HOST, Key.PORT, Key.DBNAME, Key.USER, Key.PASSWORD);
- LOG.error("Required options missing, please check: " + keys);
- return null;
+ if (affectedRows != count) {
+ LOG.error("write record missing or incorrect happened, affectedRows: " + affectedRows + ", total: " + count);
}
- String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", host, port, dbname, user, password);
- Properties connProps = new Properties();
- connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
- LOG.info("TDengine connection established, host:{} port:{} dbname:{} user:{}", host, port, dbname, user);
- return DriverManager.getConnection(jdbcUrl, connProps);
+
+ return affectedRows;
}
/**
- * 使用SQL批量写入
- *
- * @return 成功写入记录数
- * @throws SQLException
+ * table: [ "stb1", "stb2", "tb1", "tb2", "t1" ]
+ * stb1[ts,f1,f2] tags:[t1]
+ * stb2[ts,f1,f2,f3] tags:[t1,t2]
+ * 1. tables 表的的类型分成:stb(super table)/tb(sub table)/t(original table)
+ * 2. 对于stb,自动建表/schemaless
+ * 2.1: data中有tbname字段, 例如:data: [ts, f1, f2, f3, t1, t2, tbname] tbColumn: [ts, f1, f2, t1] => insert into tbname using stb1 tags(t1) values(ts, f1, f2)
+ * 2.2: data中没有tbname字段,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts, 没有批量写
+ * 3. 对于tb,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2)
+ * 4. 对于t,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2)
*/
- private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm, TaskPluginCollector collector) throws SQLException {
- Record record = lineReceiver.getFromReader();
- if (record == null) {
- return 0;
+ public int writeBatch(Connection conn, List recordBatch) {
+ int affectedRows = 0;
+ for (String table : tables) {
+ TableMeta tableMeta = tableMetas.get(table);
+ switch (tableMeta.tableType) {
+ case SUP_TABLE: {
+ if (columns.contains("tbname"))
+ affectedRows += writeBatchToSupTableBySQL(conn, table, recordBatch);
+ else
+ affectedRows += writeBatchToSupTableBySchemaless(conn, table, recordBatch);
+ }
+ break;
+ case SUB_TABLE:
+ affectedRows += writeBatchToSubTable(conn, table, recordBatch);
+ break;
+ case NML_TABLE:
+ default:
+ affectedRows += writeBatchToNormalTable(conn, table, recordBatch);
+ }
}
- String pq = String.format("INSERT INTO ? USING %s TAGS(%s) (%s) values (%s)", scm.getStable(), scm.getTagValuesPlaceHolder(), scm.getJoinedFieldNames(), scm.getFieldValuesPlaceHolder());
- LOG.info("Prepared SQL: {}", pq);
- try (TSDBPreparedStatement stmt = (TSDBPreparedStatement) conn.prepareStatement(pq)) {
- JDBCBatchWriter batchWriter = new JDBCBatchWriter(conn, stmt, scm, batchSize, collector);
- do {
- batchWriter.append(record);
- } while ((record = lineReceiver.getFromReader()) != null);
- batchWriter.flush();
- return batchWriter.getCount();
+ return affectedRows;
+ }
+
+ /**
+ * insert into record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
+ * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
+ * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
+ */
+ private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) {
+ List columnMetas = this.columnMetas.get(table);
+
+ StringBuilder sb = new StringBuilder("insert into");
+ for (Record record : recordBatch) {
+ sb.append(" ").append(record.getColumn(indexOf("tbname")).asString())
+ .append(" using ").append(table)
+ .append(" tags")
+ .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return colMeta.isTag;
+ }).map(colMeta -> {
+ return buildColumnValue(colMeta, record);
+ }).collect(Collectors.joining(",", "(", ")")))
+ .append(" ")
+ .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return !colMeta.isTag;
+ }).map(colMeta -> {
+ return colMeta.field;
+ }).collect(Collectors.joining(",", "(", ")")))
+ .append(" values")
+ .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return !colMeta.isTag;
+ }).map(colMeta -> {
+ return buildColumnValue(colMeta, record);
+ }).collect(Collectors.joining(",", "(", ")")));
+ }
+ String sql = sb.toString();
+
+ return executeUpdate(conn, sql);
+ }
+
+ private int executeUpdate(Connection conn, String sql) throws DataXException {
+ int count;
+ try (Statement stmt = conn.createStatement()) {
+ LOG.debug(">>> " + sql);
+ count = stmt.executeUpdate(sql);
+ } catch (SQLException e) {
+ throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
+ }
+ return count;
+ }
+
+ private String buildColumnValue(ColumnMeta colMeta, Record record) {
+ Column column = record.getColumn(indexOf(colMeta.field));
+ switch (column.getType()) {
+ case DATE:
+ return "'" + column.asString() + "'";
+ case BYTES:
+ case STRING:
+ if (colMeta.type.equals("TIMESTAMP"))
+ return "\"" + column.asString() + "\"";
+ String value = column.asString();
+ return "\'" + Utils.escapeSingleQuota(value) + "\'";
+ case NULL:
+ case BAD:
+ return "NULL";
+ case BOOL:
+ case DOUBLE:
+ case INT:
+ case LONG:
+ default:
+ return column.asString();
}
}
+
+ /**
+ * table: ["stb1"], column: ["ts", "f1", "f2", "t1"]
+ * data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts
+ */
+ private int writeBatchToSupTableBySchemaless(Connection conn, String table, List recordBatch) {
+ int count = 0;
+ TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
+
+ List columnMetaList = this.columnMetas.get(table);
+ ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get();
+
+ List lines = new ArrayList<>();
+ for (Record record : recordBatch) {
+ StringBuilder sb = new StringBuilder();
+ sb.append(table).append(",")
+ .append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return colMeta.isTag;
+ }).map(colMeta -> {
+ String value = record.getColumn(indexOf(colMeta.field)).asString();
+ if (value.contains(" "))
+ value = value.replace(" ", "\\ ");
+ return colMeta.field + "=" + value;
+ }).collect(Collectors.joining(",")))
+ .append(" ")
+ .append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return !colMeta.isTag && !colMeta.isPrimaryKey;
+ }).map(colMeta -> {
+ return colMeta.field + "=" + buildSchemalessColumnValue(colMeta, record);
+// return colMeta.field + "=" + record.getColumn(indexOf(colMeta.field)).asString();
+ }).collect(Collectors.joining(",")))
+ .append(" ");
+ // timestamp
+ Column column = record.getColumn(indexOf(ts.field));
+ Object tsValue = column.getRawData();
+ if (column.getType() == Column.Type.DATE && tsValue instanceof Date) {
+ long time = column.asDate().getTime();
+ switch (timestampPrecision) {
+ case NANOSEC:
+ sb.append(time * 1000000);
+ break;
+ case MICROSEC:
+ sb.append(time * 1000);
+ break;
+ case MILLISEC:
+ default:
+ sb.append(time);
+ }
+ } else if (column.getType() == Column.Type.STRING) {
+ sb.append(Utils.parseTimestamp(column.asString()));
+ } else {
+ sb.append(column.asLong());
+ }
+ String line = sb.toString();
+ LOG.debug(">>> " + line);
+ lines.add(line);
+ count++;
+ }
+
+ SchemalessWriter writer = new SchemalessWriter(conn);
+ SchemalessTimestampType timestampType;
+ switch (timestampPrecision) {
+ case NANOSEC:
+ timestampType = SchemalessTimestampType.NANO_SECONDS;
+ break;
+ case MICROSEC:
+ timestampType = SchemalessTimestampType.MICRO_SECONDS;
+ break;
+ case MILLISEC:
+ timestampType = SchemalessTimestampType.MILLI_SECONDS;
+ break;
+ default:
+ timestampType = SchemalessTimestampType.NOT_CONFIGURED;
+ }
+ try {
+ writer.write(lines, SchemalessProtocolType.LINE, timestampType);
+ } catch (SQLException e) {
+ throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
+ }
+
+ LOG.warn("schemalessWriter does not return affected rows!");
+ return count;
+ }
+
+ private long dateAsLong(Column column) {
+ TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
+ long time = column.asDate().getTime();
+ switch (timestampPrecision) {
+ case NANOSEC:
+ return time * 1000000;
+ case MICROSEC:
+ return time * 1000;
+ case MILLISEC:
+ default:
+ return time;
+ }
+ }
+
+ private String buildSchemalessColumnValue(ColumnMeta colMeta, Record record) {
+ Column column = record.getColumn(indexOf(colMeta.field));
+ switch (column.getType()) {
+ case DATE:
+ if (colMeta.type.equals("TIMESTAMP"))
+ return dateAsLong(column) + "i64";
+ return "L'" + column.asString() + "'";
+ case BYTES:
+ case STRING:
+ if (colMeta.type.equals("TIMESTAMP"))
+ return column.asString() + "i64";
+ String value = column.asString();
+ if (colMeta.type.startsWith("BINARY"))
+ return "'" + Utils.escapeSingleQuota(value) + "'";
+ if (colMeta.type.startsWith("NCHAR"))
+ return "L'" + Utils.escapeSingleQuota(value) + "'";
+ case NULL:
+ case BAD:
+ return "NULL";
+ case DOUBLE: {
+ if (colMeta.type.equals("FLOAT"))
+ return column.asString() + "f32";
+ if (colMeta.type.equals("DOUBLE"))
+ return column.asString() + "f64";
+ }
+ case INT:
+ case LONG: {
+ if (colMeta.type.equals("TINYINT"))
+ return column.asString() + "i8";
+ if (colMeta.type.equals("SMALLINT"))
+ return column.asString() + "i16";
+ if (colMeta.type.equals("INT"))
+ return column.asString() + "i32";
+ if (colMeta.type.equals("BIGINT"))
+ return column.asString() + "i64";
+ }
+ case BOOL:
+ default:
+ return column.asString();
+ }
+ }
+
+ /**
+ * table: ["tb1"], column: [tbname, ts, f1, f2, t1]
+ * if contains("tbname") and tbname != tb1 continue;
+ * else if t1 != record[idx(t1)] or t2 != record[idx(t2)]... continue;
+ * else
+ * insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
+ */
+ private int writeBatchToSubTable(Connection conn, String table, List recordBatch) {
+ List columnMetas = this.columnMetas.get(table);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("insert into ").append(table).append(" ")
+ .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return !colMeta.isTag;
+ }).map(colMeta -> {
+ return colMeta.field;
+ }).collect(Collectors.joining(",", "(", ")")))
+ .append(" values");
+ int validRecords = 0;
+ for (Record record : recordBatch) {
+ if (columns.contains("tbname") && !table.equals(record.getColumn(indexOf("tbname")).asString()))
+ continue;
+
+ boolean tagsAllMatch = columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return colMeta.isTag;
+ }).allMatch(colMeta -> {
+ return record.getColumn(indexOf(colMeta.field)).asString().equals(colMeta.value.toString());
+ });
+
+ if (ignoreTagsUnmatched && !tagsAllMatch)
+ continue;
+
+ sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
+ return !colMeta.isTag;
+ }).map(colMeta -> {
+ return buildColumnValue(colMeta, record);
+ }).collect(Collectors.joining(", ", "(", ") ")));
+ validRecords++;
+ }
+
+ if (validRecords == 0) {
+ LOG.warn("no valid records in this batch");
+ return 0;
+ }
+
+ String sql = sb.toString();
+ return executeUpdate(conn, sql);
+ }
+
+ /**
+ * table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"]
+ * sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
+ */
+ private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) {
+ List columnMetas = this.columnMetas.get(table);
+
+ StringBuilder sb = new StringBuilder();
+ sb.append("insert into ").append(table)
+ .append(" ")
+ .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
+ return colMeta.field;
+ }).collect(Collectors.joining(",", "(", ")")))
+ .append(" values ");
+
+ for (Record record : recordBatch) {
+ sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
+ return buildColumnValue(colMeta, record);
+ }).collect(Collectors.joining(",", "(", ")")));
+ }
+
+ String sql = sb.toString();
+ return executeUpdate(conn, sql);
+ }
+
+ private int indexOf(String colName) throws DataXException {
+ for (int i = 0; i < columns.size(); i++) {
+ if (columns.get(i).equals(colName))
+ return i;
+ }
+ throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
+ "cannot find col: " + colName + " in columns: " + columns);
+ }
+
}
\ No newline at end of file
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java
deleted file mode 100644
index 53ab9bb9..00000000
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JDBCBatchWriter.java
+++ /dev/null
@@ -1,244 +0,0 @@
-package com.alibaba.datax.plugin.writer.tdenginewriter;
-
-import com.alibaba.datax.common.element.Column;
-import com.alibaba.datax.common.element.Record;
-import com.alibaba.datax.common.exception.DataXException;
-import com.alibaba.datax.common.plugin.TaskPluginCollector;
-import com.taosdata.jdbc.TSDBPreparedStatement;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.sql.Connection;
-import java.sql.SQLException;
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.stream.Collectors;
-
-/**
- * 使用JDBC原生写入接口批量写入。
- * 有两个限制条件导致批量写入的代码逻辑过于复杂,以至于需要开发新的类来封装。
- * 1. 用户必须提前把需要批量写入的数据搜集到ArrayList中
- * 2. 每批写入的表名必须相同。
- * 这个类的实现逻辑是:
- * 1. 先把属于同一子表的Record缓存起来
- * 2. 缓存的数量达到batchSize阈值,自动执行一次批量写入
- * 3. 最后一批数据需要用户手动flush才能写入
- */
-public class JDBCBatchWriter {
- public static final Logger LOG = LoggerFactory.getLogger(JDBCBatchWriter.class);
- private TSDBPreparedStatement stmt;
- private SchemaManager scm;
- private Connection conn;
- private int batchSize;
- private TaskPluginCollector collector;
-
- // 缓存Record, key为tableName
- Map> buf = new HashMap<>();
- // 缓存表的标签值, key为tableName
- Map tableTagValues = new HashMap<>();
- private long sucCount = 0;
- private final int tsColIndex;
- private List fieldList;
- // 每个record至少应该包含的列数,用于校验数据
- private int minColNum = 0;
- private Map fieldIndexMap;
- private List fieldTypes = null;
-
- public JDBCBatchWriter(Connection conn, TSDBPreparedStatement stmt, SchemaManager scm, int batchSize, TaskPluginCollector collector) {
- this.conn = conn;
- this.stmt = stmt;
- this.scm = scm;
- this.batchSize = batchSize;
- this.collector = collector;
- this.tsColIndex = scm.getTsColIndex();
- this.fieldList = scm.getFieldList();
- this.fieldIndexMap = scm.getFieldIndexMap();
- this.minColNum = 1 + fieldList.size() + scm.getDynamicTagCount();
-
- }
-
- public void initFiledTypesAndTargetTable(List records) throws SQLException {
- if (fieldTypes != null) {
- return;
- }
- guessFieldTypes(records);
- if (scm.shouldCreateTable()) {
- scm.createSTable(conn, fieldTypes);
- }
- }
-
- public void append(Record record) throws SQLException {
- int columnNum = record.getColumnNumber();
- if (columnNum < minColNum) {
- // 实际列数小于期望列数
- collector.collectDirtyRecord(record, Msg.get("column_number_error"));
- return;
- }
- String[] tagValues = scm.getTagValuesFromRecord(record);
- if (tagValues == null) {
- // 标签列包含null
- collector.collectDirtyRecord(record, Msg.get("tag_value_error"));
- return;
- }
- if (!scm.hasTimestamp(record)) {
- // 时间戳列为null或类型错误
- collector.collectDirtyRecord(record, Msg.get("ts_value_error"));
- return;
- }
- String tableName = scm.computeTableName(tagValues);
- if (buf.containsKey(tableName)) {
- List lis = buf.get(tableName);
- lis.add(record);
- if (lis.size() == batchSize) {
- if (fieldTypes == null) {
- initFiledTypesAndTargetTable(lis);
- }
- executeBatch(tableName);
- lis.clear();
- }
- } else {
- List lis = new ArrayList<>(batchSize);
- lis.add(record);
- buf.put(tableName, lis);
- tableTagValues.put(tableName, tagValues);
- }
- }
-
- /**
- * 只有String类型比较特别,测试发现值为null的列会转成String类型。所以Column的类型为String并不代表这一列的类型真的是String。
- *
- * @param records
- */
- private void guessFieldTypes(List records) {
- fieldTypes = new ArrayList<>(fieldList.size());
- for (int i = 0; i < fieldList.size(); ++i) {
- int colIndex = fieldIndexMap.get(fieldList.get(i));
- boolean ok = false;
- for (int j = 0; j < records.size() && !ok; ++j) {
- Column column = records.get(j).getColumn(colIndex);
- Column.Type type = column.getType();
- switch (type) {
- case LONG:
- case DOUBLE:
- case DATE:
- case BOOL:
- case BYTES:
- if (column.getRawData() != null) {
- fieldTypes.add(type);
- ok = true;
- }
- break;
- case STRING:
- // 只有非null且非空的String列,才会被真的当作String类型。
- String value = column.asString();
- if (value != null && !"".equals(value)) {
- fieldTypes.add(type);
- ok = true;
- }
- break;
- default:
- throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString());
- }
- }
- if (!ok) {
- // 根据采样的%d条数据,无法推断第%d列的数据类型
- throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, String.format(Msg.get("infer_column_type_error"), records.size(), i + 1));
- }
- }
- LOG.info("Field Types: {}", fieldTypes);
- }
-
- /**
- * 执行单表批量写入
- *
- * @param tableName
- * @throws SQLException
- */
- private void executeBatch(String tableName) throws SQLException {
- // 表名
- stmt.setTableName(tableName);
- List records = buf.get(tableName);
- // 标签
- String[] tagValues = tableTagValues.get(tableName);
- LOG.debug("executeBatch {}", String.join(",", tagValues));
- for (int i = 0; i < tagValues.length; ++i) {
- stmt.setTagNString(i, tagValues[i]);
- }
- // 时间戳
- ArrayList tsList = records.stream().map(r -> r.getColumn(tsColIndex).asDate().getTime()).collect(Collectors.toCollection(ArrayList::new));
- stmt.setTimestamp(0, tsList);
- // 字段
- for (int i = 0; i < fieldList.size(); ) {
- String fieldName = fieldList.get(i);
- int index = fieldIndexMap.get(fieldName);
- switch (fieldTypes.get(i)) {
- case LONG:
- ArrayList lisLong = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
- stmt.setLong(++i, lisLong);
- break;
- case DOUBLE:
- ArrayList lisDouble = records.stream().map(r -> r.getColumn(index).asDouble()).collect(Collectors.toCollection(ArrayList::new));
- stmt.setDouble(++i, lisDouble);
- break;
- case STRING:
- ArrayList lisString = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
- stmt.setNString(++i, lisString, 64);
- break;
- case DATE:
- ArrayList lisTs = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
- stmt.setTimestamp(++i, lisTs);
- break;
- case BOOL:
- ArrayList lisBool = records.stream().map(r -> r.getColumn(index).asBoolean()).collect(Collectors.toCollection(ArrayList::new));
- stmt.setBoolean(++i, lisBool);
- break;
- case BYTES:
- ArrayList lisBytes = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
- stmt.setString(++i, lisBytes, 64);
- break;
- default:
- throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString());
- }
- }
- // 执行
- stmt.columnDataAddBatch();
- stmt.columnDataExecuteBatch();
- // 更新计数器
- sucCount += records.size();
- }
-
- /**
- * 把缓存的Record全部写入
- */
- public void flush() throws SQLException {
- if (fieldTypes == null) {
- List records = new ArrayList<>();
- for (List lis : buf.values()) {
- records.addAll(lis);
- if (records.size() > 100) {
- break;
- }
- }
- if (records.size() > 0) {
- initFiledTypesAndTargetTable(records);
- } else {
- return;
- }
- }
- for (String tabName : buf.keySet()) {
- if (buf.get(tabName).size() > 0) {
- executeBatch(tabName);
- }
- }
- stmt.columnDataCloseBatch();
- }
-
- /**
- * @return 成功写入的数据量
- */
- public long getCount() {
- return sucCount;
- }
-}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java
deleted file mode 100644
index 0aabe32a..00000000
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/JniConnection.java
+++ /dev/null
@@ -1,89 +0,0 @@
-package com.alibaba.datax.plugin.writer.tdenginewriter;
-
-import java.util.Properties;
-
-public class JniConnection {
-
- private static final long JNI_NULL_POINTER = 0L;
- private static final int JNI_SUCCESSFUL = 0;
- public static final String PROPERTY_KEY_CONFIG_DIR = "cfgdir";
- public static final String PROPERTY_KEY_LOCALE = "locale";
- public static final String PROPERTY_KEY_CHARSET = "charset";
- public static final String PROPERTY_KEY_TIME_ZONE = "timezone";
-
- private long conn;
-
- static {
- System.loadLibrary("taos");
- }
-
- public JniConnection(Properties props) throws Exception {
- initImp(props.getProperty(PROPERTY_KEY_CONFIG_DIR, null));
-
- String locale = props.getProperty(PROPERTY_KEY_LOCALE);
- if (setOptions(0, locale) < 0) {
- throw new Exception("Failed to set locale: " + locale + ". System default will be used.");
- }
- String charset = props.getProperty(PROPERTY_KEY_CHARSET);
- if (setOptions(1, charset) < 0) {
- throw new Exception("Failed to set charset: " + charset + ". System default will be used.");
- }
- String timezone = props.getProperty(PROPERTY_KEY_TIME_ZONE);
- if (setOptions(2, timezone) < 0) {
- throw new Exception("Failed to set timezone: " + timezone + ". System default will be used.");
- }
- }
-
- public void open(String host, int port, String dbname, String user, String password) throws Exception {
- if (this.conn != JNI_NULL_POINTER) {
- close();
- this.conn = JNI_NULL_POINTER;
- }
-
- this.conn = connectImp(host, port, dbname, user, password);
- if (this.conn == JNI_NULL_POINTER) {
- String errMsg = getErrMsgImp(0);
- throw new Exception(errMsg);
- }
- }
-
- public void insertOpentsdbJson(String json) throws Exception {
- if (this.conn == JNI_NULL_POINTER) {
- throw new Exception("JNI connection is NULL");
- }
-
- long result = insertOpentsdbJson(json, this.conn);
- int errCode = getErrCodeImp(this.conn, result);
- if (errCode != JNI_SUCCESSFUL) {
- String errMsg = getErrMsgImp(result);
- freeResultSetImp(this.conn, result);
- throw new Exception(errMsg);
- }
- freeResultSetImp(this.conn, result);
- }
-
- public void close() throws Exception {
- int code = this.closeConnectionImp(this.conn);
- if (code != 0) {
- throw new Exception("JNI closeConnection failed");
- }
- this.conn = JNI_NULL_POINTER;
- }
-
- private static native void initImp(String configDir);
-
- private static native int setOptions(int optionIndex, String optionValue);
-
- private native long connectImp(String host, int port, String dbName, String user, String password);
-
- private native int getErrCodeImp(long connection, long pSql);
-
- private native String getErrMsgImp(long pSql);
-
- private native void freeResultSetImp(long connection, long pSql);
-
- private native int closeConnectionImp(long connection);
-
- private native long insertOpentsdbJson(String json, long pSql);
-
-}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java
index 7fb383e6..1d7ee214 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Key.java
@@ -1,14 +1,12 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
public class Key {
- public static final String HOST = "host";
- public static final String PORT = "port";
- public static final String DBNAME = "dbName";
- public static final String USER = "username";
+ public static final String USERNAME = "username";
public static final String PASSWORD = "password";
+ public static final String CONNECTION = "connection";
public static final String BATCH_SIZE = "batchSize";
- public static final String STABLE = "stable";
- public static final String TAG_COLUMN = "tagColumn";
- public static final String FIELD_COLUMN = "fieldColumn";
- public static final String TIMESTAMP_COLUMN = "timestampColumn";
-}
+ public static final String TABLE = "table";
+ public static final String JDBC_URL = "jdbcUrl";
+ public static final String COLUMN = "column";
+ public static final String IGNORE_TAGS_UNMATCHED = "ignoreTagsUnmatched";
+}
\ No newline at end of file
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java
index e1b8f5dd..2fb5a98f 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/OpentsdbDataHandler.java
@@ -5,50 +5,50 @@ import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
+import com.alibaba.datax.common.util.Configuration;
+import com.taosdata.jdbc.SchemalessWriter;
+import com.taosdata.jdbc.enums.SchemalessProtocolType;
+import com.taosdata.jdbc.enums.SchemalessTimestampType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.Properties;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
public class OpentsdbDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(OpentsdbDataHandler.class);
- private static final String DEFAULT_BATCH_SIZE = "1";
+ private SchemalessWriter writer;
+
+ private String jdbcUrl;
+ private String user;
+ private String password;
+ int batchSize;
+
+ public OpentsdbDataHandler(Configuration config) {
+ // opentsdb json protocol use JNI and schemaless API to write
+ this.jdbcUrl = config.getString(Key.JDBC_URL);
+ this.user = config.getString(Key.USERNAME, "root");
+ this.password = config.getString(Key.PASSWORD, "taosdata");
+ this.batchSize = config.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE);
+ }
@Override
- public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) {
- // opentsdb json protocol use JNI and schemaless API to write
- String host = properties.getProperty(Key.HOST);
- int port = Integer.parseInt(properties.getProperty(Key.PORT));
- String dbname = properties.getProperty(Key.DBNAME);
- String user = properties.getProperty(Key.USER);
- String password = properties.getProperty(Key.PASSWORD);
-
- JniConnection conn = null;
- long count = 0;
- try {
- conn = new JniConnection(properties);
- conn.open(host, port, dbname, user, password);
- LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user);
- int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE));
- count = writeOpentsdb(lineReceiver, conn, batchSize);
+ public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) {
+ int count = 0;
+ try (Connection conn = DriverManager.getConnection(jdbcUrl, user, password);) {
+ LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + user + "] established.");
+ writer = new SchemalessWriter(conn);
+ count = write(lineReceiver, batchSize);
} catch (Exception e) {
- LOG.error(e.getMessage());
- e.printStackTrace();
- } finally {
- try {
- if (conn != null)
- conn.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
- LOG.info("TDengine connection closed");
+ throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e);
}
return count;
}
- private long writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn, int batchSize) {
- long recordIndex = 1;
+ private int write(RecordReceiver lineReceiver, int batchSize) throws DataXException {
+ int recordIndex = 1;
try {
Record record;
StringBuilder sb = new StringBuilder();
@@ -56,14 +56,14 @@ public class OpentsdbDataHandler implements DataHandler {
if (batchSize == 1) {
String jsonData = recordToString(record);
LOG.debug(">>> " + jsonData);
- conn.insertOpentsdbJson(jsonData);
+ writer.write(jsonData, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
} else if (recordIndex % batchSize == 1) {
sb.append("[").append(recordToString(record)).append(",");
} else if (recordIndex % batchSize == 0) {
sb.append(recordToString(record)).append("]");
String jsonData = sb.toString();
LOG.debug(">>> " + jsonData);
- conn.insertOpentsdbJson(jsonData);
+ writer.write(jsonData, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
sb.delete(0, sb.length());
} else {
sb.append(recordToString(record)).append(",");
@@ -72,11 +72,11 @@ public class OpentsdbDataHandler implements DataHandler {
}
if (sb.length() != 0 && sb.charAt(0) == '[') {
String jsonData = sb.deleteCharAt(sb.length() - 1).append("]").toString();
+ System.err.println(jsonData);
LOG.debug(">>> " + jsonData);
- conn.insertOpentsdbJson(jsonData);
+ writer.write(jsonData, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
}
} catch (Exception e) {
- LOG.error("TDengineWriter ERROR: " + e.getMessage());
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e);
}
return recordIndex - 1;
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
index d67a6585..877b9b6d 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java
@@ -1,9 +1,7 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
-import com.alibaba.datax.common.element.Column;
-import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
-import org.apache.commons.codec.digest.DigestUtils;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -12,260 +10,163 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
-import java.util.stream.Collectors;
public class SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
- private String stable; // 目标超表名
- private Map fixedTagValue = new HashMap<>(); // 固定标签值 标签名 -> 标签值
- private Map tagIndexMap = new HashMap<>(); // 动态标签值 标签名 -> 列索引
- private Map fieldIndexMap = new HashMap<>(); // 字段名 -> 字段索引
- private String tsColName; // 时间戳列名
- private int tsColIndex = -1; // 时间戳列索引
- private List fieldList = new ArrayList<>();
- private List tagList = new ArrayList<>();
- private boolean canInferSchemaFromConfig = false;
+ private final Connection conn;
+ private TimestampPrecision precision;
-
- public SchemaManager() {
+ SchemaManager(Connection conn) {
+ this.conn = conn;
}
- public SchemaManager(Properties properties) {
- getFromConfig(properties);
- }
+ public TimestampPrecision loadDatabasePrecision() throws DataXException {
+ if (this.precision != null)
+ return this.precision;
- private String mapDataxType(Column.Type type) {
- switch (type) {
- case LONG:
- return "BIGINT";
- case DOUBLE:
- return "DOUBLE";
- case STRING:
- return "NCHAR(64)";
- case DATE:
- return "TIMESTAMP";
- case BOOL:
- return "BOOL";
- case BYTES:
- return "BINARY(64)";
- default:
- throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, type.toString());
- }
- }
-
- public void setStable(String stable) {
- stable = stable;
- }
-
- public String getStable() {
- return stable;
- }
-
- private void getFromConfig(Properties properties) {
- stable = properties.getProperty(Key.STABLE);
- if (stable == null) {
- LOG.error("Config error: no stable");
- return;
- }
- for (Object key : properties.keySet()) {
- String k = (String) key;
- String v = properties.getProperty(k);
-
- String[] ps = k.split("\\.");
- if (ps.length == 1) {
- continue;
- }
- if (k.startsWith(Key.TAG_COLUMN)) {
- String tagName = ps[1];
- try {
- Integer tagIndex = Integer.parseInt(v);
- this.tagIndexMap.put(tagName, tagIndex);
- tagList.add(tagName);
- } catch (NumberFormatException e) {
- fixedTagValue.put(tagName, v);
- tagList.add(tagName);
- }
- } else if (k.startsWith(Key.FIELD_COLUMN)) {
- String fieldName = ps[1];
- Integer fileIndex = Integer.parseInt(v);
- fieldIndexMap.put(fieldName, fileIndex);
- } else if (k.startsWith(Key.TIMESTAMP_COLUMN)) {
- tsColName = ps[1];
- tsColIndex = Integer.parseInt(v);
- }
- }
- List sortedFieldName = fieldIndexMap.entrySet().stream().sorted((x, y) -> x.getValue().compareTo(y.getValue())).map(e -> e.getKey()).collect(Collectors.toList());
- fieldList.addAll(sortedFieldName); // 排序的目的是保证自动建表时列的顺序和输入数据的列的顺序保持一致
- canInferSchemaFromConfig = tsColIndex > -1 && !(fixedTagValue.isEmpty() && tagIndexMap.isEmpty()) && !fieldIndexMap.isEmpty();
- LOG.info("Config file parsed result:fixedTags=[{}] ,tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", fixedTagValue.keySet()), String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
- }
-
- public boolean shouldGuessSchema() {
- return !canInferSchemaFromConfig;
- }
-
- public boolean shouldCreateTable() {
- return canInferSchemaFromConfig;
- }
-
- public boolean configValid() {
- boolean valid = (tagList.size() > 0 && fieldList.size() > 0 && tsColIndex > -1) || (tagList.size() == 0 && fieldList.size() == 0 && tsColIndex == -1);
- if (!valid) {
- LOG.error("Config error: tagColumn, fieldColumn and timestampColumn must be present together or absent together.");
- }
- return valid;
- }
-
- /**
- * 通过执行`describe dbname.stable`命令,获取表的schema.
- * describe命名返回有4列内容,分布是:Field,Type,Length,Note
- *
- * @return 成功返回true,如果超表不存在或其他错误则返回false
- */
- public boolean getFromDB(Connection conn) {
- try {
- List stables = getSTables(conn);
- if (!stables.contains(stable)) {
- LOG.error("super table {} not exist, fail to get schema from database.", stable);
- return false;
- }
- } catch (SQLException e) {
- LOG.error(e.getMessage());
- e.printStackTrace();
- return false;
- }
try (Statement stmt = conn.createStatement()) {
- ResultSet rs = stmt.executeQuery("describe " + stable);
- int colIndex = 0;
+ ResultSet rs = stmt.executeQuery("select database()");
+ String dbname = null;
while (rs.next()) {
- String name = rs.getString(1);
- String type = rs.getString(2);
- String note = rs.getString(4);
- if ("TIMESTAMP".equals(type)) {
- tsColName = name;
- tsColIndex = colIndex;
- } else if ("TAG".equals(note)) {
- tagIndexMap.put(name, colIndex);
- tagList.add(name);
- } else {
- fieldIndexMap.put(name, colIndex);
- fieldList.add(name);
- }
- colIndex++;
+ dbname = rs.getString("database()");
+ }
+ if (dbname == null)
+ throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
+ "Database not specified or available");
+
+ rs = stmt.executeQuery("show databases");
+ while (rs.next()) {
+ String name = rs.getString("name");
+ if (!name.equalsIgnoreCase(dbname))
+ continue;
+ String precision = rs.getString("precision");
+ switch (precision) {
+ case "ns":
+ this.precision = TimestampPrecision.NANOSEC;
+ break;
+ case "us":
+ this.precision = TimestampPrecision.MICROSEC;
+ break;
+ case "ms":
+ default:
+ this.precision = TimestampPrecision.MILLISEC;
+ }
}
- LOG.info("table info:tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
- return true;
} catch (SQLException e) {
- LOG.error(e.getMessage());
- e.printStackTrace();
- return false;
+ throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
+ return this.precision;
}
- public static List getSTables(Connection conn) throws SQLException {
- List stables = new ArrayList<>();
+ public Map loadTableMeta(List tables) throws DataXException {
+ Map tableMetas = new HashMap();
+
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("show stables");
while (rs.next()) {
- String name = rs.getString(1);
- stables.add(name);
+ TableMeta tableMeta = buildSupTableMeta(rs);
+ if (!tables.contains(tableMeta.tbname))
+ continue;
+ tableMetas.put(tableMeta.tbname, tableMeta);
}
- }
- return stables;
- }
- public void createSTable(Connection conn, List fieldTypes) throws SQLException {
- StringBuilder sb = new StringBuilder();
- sb.append("CREATE STABLE IF NOT EXISTS ").append(stable).append("(");
- sb.append(tsColName).append(" ").append("TIMESTAMP,");
- for (int i = 0; i < fieldList.size(); ++i) {
- String fieldName = fieldList.get(i);
- Column.Type dxType = fieldTypes.get(i);
- sb.append(fieldName).append(' ');
- String tdType = mapDataxType(dxType);
- sb.append(tdType).append(',');
- }
- sb.deleteCharAt(sb.length() - 1);
- sb.append(") TAGS(");
- for (String tagName : tagList) {
- sb.append(tagName).append(" NCHAR(64),");
- }
- sb.deleteCharAt(sb.length() - 1);
- sb.append(")");
- String q = sb.toString();
- LOG.info("run sql:" + q);
- try (Statement stmt = conn.createStatement()) {
- stmt.execute(q);
- }
- }
-
- public String[] getTagValuesFromRecord(Record record) {
- String[] tagValues = new String[tagList.size()];
- for (int i = 0; i < tagList.size(); ++i) {
- if (fixedTagValue.containsKey(tagList.get(i))) {
- tagValues[i] = fixedTagValue.get(tagList.get(i));
- } else {
- int tagIndex = tagIndexMap.get(tagList.get(i));
- tagValues[i] = record.getColumn(tagIndex).asString();
+ rs = stmt.executeQuery("show tables");
+ while (rs.next()) {
+ TableMeta tableMeta = buildSubTableMeta(rs);
+ if (!tables.contains(tableMeta.tbname))
+ continue;
+ tableMetas.put(tableMeta.tbname, tableMeta);
}
- if (tagValues[i] == null) {
- return null;
+
+ for (String tbname : tables) {
+ if (!tableMetas.containsKey(tbname)) {
+ LOG.error("table metadata of " + tbname + " is empty!");
+ }
}
+ } catch (SQLException e) {
+ throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
- return tagValues;
+ return tableMetas;
}
- public boolean hasTimestamp(Record record) {
- Column column = record.getColumn(tsColIndex);
- if (column.getType() == Column.Type.DATE && column.asDate() != null) {
- return true;
- } else {
- return false;
+ public Map> loadColumnMetas(List tables) throws DataXException {
+ Map> ret = new HashMap<>();
+
+ for (String table : tables) {
+ List columnMetaList = new ArrayList<>();
+ try (Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery("describe " + table);
+ for (int i = 0; rs.next(); i++) {
+ ColumnMeta columnMeta = buildColumnMeta(rs, i == 0);
+ columnMetaList.add(columnMeta);
+ }
+ } catch (SQLException e) {
+ throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
+ }
+
+ if (columnMetaList.isEmpty()) {
+ LOG.error("column metadata of " + table + " is empty!");
+ continue;
+ }
+
+ columnMetaList.stream().filter(colMeta -> colMeta.isTag).forEach(colMeta -> {
+ String sql = "select " + colMeta.field + " from " + table;
+ Object value = null;
+ try (Statement stmt = conn.createStatement()) {
+ ResultSet rs = stmt.executeQuery(sql);
+ for (int i = 0; rs.next(); i++) {
+ value = rs.getObject(colMeta.field);
+ if (i > 0) {
+ value = null;
+ break;
+ }
+ }
+ } catch (SQLException e) {
+ e.printStackTrace();
+ }
+ colMeta.value = value;
+ });
+
+ LOG.debug("load column metadata of " + table + ": " + Arrays.toString(columnMetaList.toArray()));
+ ret.put(table, columnMetaList);
}
+ return ret;
}
- public Map getFieldIndexMap() {
- return fieldIndexMap;
+ private TableMeta buildSupTableMeta(ResultSet rs) throws SQLException {
+ TableMeta tableMeta = new TableMeta();
+ tableMeta.tableType = TableType.SUP_TABLE;
+ tableMeta.tbname = rs.getString("name");
+ tableMeta.columns = rs.getInt("columns");
+ tableMeta.tags = rs.getInt("tags");
+ tableMeta.tables = rs.getInt("tables");
+
+ LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
+ return tableMeta;
}
- public List getFieldList() {
- return fieldList;
+ private TableMeta buildSubTableMeta(ResultSet rs) throws SQLException {
+ TableMeta tableMeta = new TableMeta();
+ String stable_name = rs.getString("stable_name");
+ tableMeta.tableType = StringUtils.isBlank(stable_name) ? TableType.NML_TABLE : TableType.SUB_TABLE;
+ tableMeta.tbname = rs.getString("table_name");
+ tableMeta.columns = rs.getInt("columns");
+ tableMeta.stable_name = StringUtils.isBlank(stable_name) ? null : stable_name;
+
+ LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
+ return tableMeta;
}
- public String getJoinedFieldNames() {
- return tsColName + ", " + String.join(", ", fieldList);
+ private ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException {
+ ColumnMeta columnMeta = new ColumnMeta();
+ columnMeta.field = rs.getString("Field");
+ columnMeta.type = rs.getString("Type");
+ columnMeta.length = rs.getInt("Length");
+ columnMeta.note = rs.getString("Note");
+ columnMeta.isTag = columnMeta.note != null && columnMeta.note.equals("TAG");
+ columnMeta.isPrimaryKey = isPrimaryKey;
+ return columnMeta;
}
- public int getTsColIndex() {
- return tsColIndex;
- }
-
- public String getTagValuesPlaceHolder() {
- return tagList.stream().map(x -> "?").collect(Collectors.joining(","));
- }
-
- public String getFieldValuesPlaceHolder() {
- return "?, " + fieldList.stream().map(x -> "?").collect(Collectors.joining(", "));
- }
-
- /**
- * 计算子表表名
- *
- * - 将标签的value 组合成为如下的字符串: tag_value1!tag_value2!tag_value3。
- * - 计算该字符串的 MD5 散列值 "md5_val"。
- * - "t_md5val"作为子表名。其中的 "t" 是固定的前缀。
- *
- *
- * @param tagValues
- * @return
- */
- public String computeTableName(String[] tagValues) {
- String s = String.join("!", tagValues);
- return "t_" + DigestUtils.md5Hex(s);
- }
-
- public int getDynamicTagCount() {
- return tagIndexMap.size();
- }
}
diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java
index 79e5238c..7cc76a77 100644
--- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java
+++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java
@@ -1,37 +1,56 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
-
+import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
-import java.util.Properties;
-import java.util.Set;
public class TDengineWriter extends Writer {
private static final String PEER_PLUGIN_NAME = "peerPluginName";
- static {
- try {
- Class.forName("com.taosdata.jdbc.TSDBDriver");
- } catch (ClassNotFoundException e) {
- e.printStackTrace();
- }
- }
-
public static class Job extends Writer.Job {
private Configuration originalConfig;
+ private static final Logger LOG = LoggerFactory.getLogger(Job.class);
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
this.originalConfig.set(PEER_PLUGIN_NAME, getPeerPluginName());
+
+ // check user
+ String user = this.originalConfig.getString(Key.USERNAME);
+ if (StringUtils.isBlank(user))
+ throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
+ + Key.USERNAME + "] is not set.");
+
+ // check password
+ String password = this.originalConfig.getString(Key.PASSWORD);
+ if (StringUtils.isBlank(password))
+ throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
+ + Key.PASSWORD + "] is not set.");
+
+ // check connection
+ List