diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index 849486fd..97f965c9 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -9,6 +9,7 @@ import com.alibaba.datax.common.util.Configuration; import com.taosdata.jdbc.SchemalessWriter; import com.taosdata.jdbc.enums.SchemalessProtocolType; import com.taosdata.jdbc.enums.SchemalessTimestampType; +import com.taosdata.jdbc.utils.StringUtils; import com.taosdata.jdbc.utils.Utils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -76,12 +77,26 @@ public class DefaultDataHandler implements DataHandler { try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) { LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established."); + String dbname = getDbnameFromJdbcUrl(jdbcUrl); + Statement statement = conn.createStatement(); + ResultSet resultSet = statement.executeQuery("select " + Constants.SERVER_VERSION); + // 光标移动一行 + resultSet.next(); + String serverVersion = resultSet.getString(Constants.SERVER_VERSION); + if (StringUtils.isEmpty(dbname)) { + throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "error dbname parsed from jdbcUrl"); + } + LOG.info("tdengine server version[{}] dbname[{}]", serverVersion, dbname); + // 版本判断确定使用 SchemaManager 的示例 + if (serverVersion.startsWith(Constants.SERVER_VERSION_2)) { + this.schemaManager = new SchemaManager(conn); + } else { + this.schemaManager = new Schema3_0Manager(conn, dbname); + } // prepare table_name -> table_meta - this.schemaManager = new SchemaManager(conn); this.tableMetas = schemaManager.loadTableMeta(tables); // prepare table_name -> column_meta this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables); - // filter column for (String tableName : tbnameColumnMetasMap.keySet()) { List columnMetaList = tbnameColumnMetasMap.get(tableName); @@ -132,6 +147,20 @@ public class DefaultDataHandler implements DataHandler { return affectedRows; } + /** + * 从 jdbcUrl 中解析出数据库名称 + * @param jdbcUrl 格式是 jdbc:://:/[?可选参数] + * @return 数据库名称 + */ + private static String getDbnameFromJdbcUrl(String jdbcUrl) { + int questionMarkIndex = -1; + if (jdbcUrl.contains("?")) { + questionMarkIndex = jdbcUrl.indexOf("?"); + } + return questionMarkIndex == -1 ? jdbcUrl.substring(jdbcUrl.lastIndexOf("/") + 1) : + jdbcUrl.substring(jdbcUrl.lastIndexOf("/") + 1, questionMarkIndex); + } + private int writeEachRow(Connection conn, List recordBatch) { int affectedRows = 0; for (Record record : recordBatch) { @@ -465,7 +494,7 @@ public class DefaultDataHandler implements DataHandler { return column.asString() + "i64"; String value = column.asString(); value = value.replace("\"", "\\\""); - if (colMeta.type.startsWith("BINARY")) + if (colMeta.type.startsWith("BINARY") || colMeta.type.startsWith("VARCHAR")) return "\"" + value + "\""; if (colMeta.type.startsWith("NCHAR")) return "L\"" + value + "\""; diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Schema3_0Manager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Schema3_0Manager.java new file mode 100644 index 00000000..a429be94 --- /dev/null +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/Schema3_0Manager.java @@ -0,0 +1,166 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.common.exception.DataXException; +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.SQLException; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +/** + * 适配 TDengine 3.0 的 SchemaManager + */ +public class Schema3_0Manager extends SchemaManager { + + private static final Logger LOG = LoggerFactory.getLogger(Schema3_0Manager.class); + + private final String dbname; + + public Schema3_0Manager(Connection conn, String dbname) { + super(conn); + this.dbname = dbname; + } + + @Override + public Map loadTableMeta(List tables) throws DataXException { + Map tableMetas = new HashMap<>(); + + try (Statement stmt = conn.createStatement()) { + ResultSet rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA + + Constants.INFORMATION_SCHEMA_TABLE_INS_STABLES + " where db_name = " + getDbnameForSqlQuery()); + while (rs.next()) { + TableMeta tableMeta = buildSupTableMeta(rs); + if (!tables.contains(tableMeta.tbname)) + continue; + tableMetas.put(tableMeta.tbname, tableMeta); + } + + rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA + + Constants.INFORMATION_SCHEMA_TABLE_INS_TABLES + " where db_name = " + getDbnameForSqlQuery()); + while (rs.next()) { + TableMeta tableMeta = buildSubTableMeta(rs); + if (!tables.contains(tableMeta.tbname)) + continue; + tableMetas.put(tableMeta.tbname, tableMeta); + } + + for (String tbname : tables) { + if (!tableMetas.containsKey(tbname)) { + throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "table metadata of " + tbname + " is empty!"); + } + } + } catch (SQLException e) { + throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage()); + } + return tableMetas; + } + + private String getDbnameForSqlQuery() { + return "\"" + dbname + "\""; + } + + @Override + protected TableMeta buildSupTableMeta(ResultSet rs) throws SQLException { + TableMeta tableMeta = new TableMeta(); + tableMeta.tableType = TableType.SUP_TABLE; + tableMeta.tbname = rs.getString(Constants.TABLE_META_SUP_TABLE_NAME); + tableMeta.columns = rs.getInt(Constants.TABLE_META_COLUMNS); + tableMeta.tags = rs.getInt(Constants.TABLE_META_TAGS); +// tableMeta.tables = rs.getInt("tables"); // 直接从 ins_stables 查不到子表数量 + LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta); + return tableMeta; + } + + @Override + protected TableMeta buildSubTableMeta(ResultSet rs) throws SQLException { + TableMeta tableMeta = new TableMeta(); + String stable_name = rs.getString(Constants.TABLE_META_SUP_TABLE_NAME); + tableMeta.tableType = StringUtils.isBlank(stable_name) ? TableType.NML_TABLE : TableType.SUB_TABLE; + tableMeta.tbname = rs.getString(Constants.TABLE_META_TABLE_NAME); + tableMeta.columns = rs.getInt(Constants.TABLE_META_COLUMNS); + tableMeta.stable_name = StringUtils.isBlank(stable_name) ? null : stable_name; + LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta); + return tableMeta; + } + + @Override + protected ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException { + ColumnMeta columnMeta = new ColumnMeta(); + columnMeta.field = rs.getString(Constants.COLUMN_META_FIELD); + columnMeta.type = rs.getString(Constants.COLUMN_META_TYPE); + columnMeta.length = rs.getInt(Constants.COLUMN_META_LENGTH); + columnMeta.note = rs.getString(Constants.COLUMN_META_NOTE); + columnMeta.isTag = Constants.COLUMN_META_NOTE_TAG.equals(columnMeta.note); + // columnMeta.isPrimaryKey = "ts".equals(columnMeta.field); + columnMeta.isPrimaryKey = isPrimaryKey; + return columnMeta; + } + + @Override + public TimestampPrecision loadDatabasePrecision() throws DataXException { + try (Statement stmt = conn.createStatement()) { + ResultSet rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA + + Constants.INFORMATION_SCHEMA_TABLE_INS_DATABASES + " where name = " + getDbnameForSqlQuery()); + while (rs.next()) { + String precision = rs.getString(Constants.DATABASE_META_PRECISION); + switch (precision) { + case "ns": + this.precision = TimestampPrecision.NANOSEC; + break; + case "us": + this.precision = TimestampPrecision.MICROSEC; + break; + case "ms": + default: + this.precision = TimestampPrecision.MILLISEC; + } + } + } catch (SQLException e) { + throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage()); + } + return this.precision; + } + + @Override + public Map loadTagTableNameMap(String table) throws SQLException { + if (tags2tbnameMaps.containsKey(table)) + return tags2tbnameMaps.get(table); + Map tags2tbname = new HashMap<>(); + try (Statement stmt = conn.createStatement()) { + // describe table + List tags = new ArrayList<>(); + ResultSet rs = stmt.executeQuery("describe " + table); + while (rs.next()) { + String note = rs.getString(Constants.COLUMN_META_NOTE); + if (Constants.COLUMN_META_NOTE_TAG.equals(note)) { + tags.add(rs.getString(Constants.COLUMN_META_FIELD)); + } + } + // select distinct tbname, t1, t2 from stb + rs = stmt.executeQuery("select distinct " + String.join(",", tags) + ",tbname from " + table); + while (rs.next()) { + ResultSet finalRs = rs; + String tagStr = tags.stream().map(t -> { + try { + return finalRs.getString(t); + } catch (SQLException e) { + LOG.error(e.getMessage(), e); + } + return "NULL"; + }).collect(Collectors.joining(TAG_TABLE_NAME_MAP_KEY_SPLITTER)); + String tbname = rs.getString("tbname"); + tags2tbname.put(tagStr, tbname); + } + } + tags2tbnameMaps.put(table, tags2tbname); + return tags2tbname; + } +} diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java index fc0c002d..8cff3c88 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/SchemaManager.java @@ -12,11 +12,11 @@ import java.util.stream.Collectors; public class SchemaManager { private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class); // private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "_"; - private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = ""; + protected static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = ""; - private final Connection conn; - private TimestampPrecision precision; - private Map> tags2tbnameMaps = new HashMap<>(); + protected final Connection conn; + protected TimestampPrecision precision; + protected Map> tags2tbnameMaps = new HashMap<>(); public SchemaManager(Connection conn) { this.conn = conn; @@ -135,7 +135,7 @@ public class SchemaManager { return ret; } - private TableMeta buildSupTableMeta(ResultSet rs) throws SQLException { + protected TableMeta buildSupTableMeta(ResultSet rs) throws SQLException { TableMeta tableMeta = new TableMeta(); tableMeta.tableType = TableType.SUP_TABLE; tableMeta.tbname = rs.getString("name"); @@ -147,7 +147,7 @@ public class SchemaManager { return tableMeta; } - private TableMeta buildSubTableMeta(ResultSet rs) throws SQLException { + protected TableMeta buildSubTableMeta(ResultSet rs) throws SQLException { TableMeta tableMeta = new TableMeta(); String stable_name = rs.getString("stable_name"); tableMeta.tableType = StringUtils.isBlank(stable_name) ? TableType.NML_TABLE : TableType.SUB_TABLE; @@ -159,7 +159,7 @@ public class SchemaManager { return tableMeta; } - private ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException { + protected ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException { ColumnMeta columnMeta = new ColumnMeta(); columnMeta.field = rs.getString("Field"); columnMeta.type = rs.getString("Type");