feat: tdengine 3.0 adaptation

This commit is contained in:
秦冲 2022-12-28 11:03:11 +08:00
parent 73dffe31a6
commit 056702bcba
3 changed files with 205 additions and 10 deletions

View File

@ -9,6 +9,7 @@ import com.alibaba.datax.common.util.Configuration;
import com.taosdata.jdbc.SchemalessWriter; import com.taosdata.jdbc.SchemalessWriter;
import com.taosdata.jdbc.enums.SchemalessProtocolType; import com.taosdata.jdbc.enums.SchemalessProtocolType;
import com.taosdata.jdbc.enums.SchemalessTimestampType; import com.taosdata.jdbc.enums.SchemalessTimestampType;
import com.taosdata.jdbc.utils.StringUtils;
import com.taosdata.jdbc.utils.Utils; import com.taosdata.jdbc.utils.Utils;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -76,12 +77,26 @@ public class DefaultDataHandler implements DataHandler {
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) { try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established."); LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
String dbname = getDbnameFromJdbcUrl(jdbcUrl);
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery("select " + Constants.SERVER_VERSION);
// 光标移动一行
resultSet.next();
String serverVersion = resultSet.getString(Constants.SERVER_VERSION);
if (StringUtils.isEmpty(dbname)) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "error dbname parsed from jdbcUrl");
}
LOG.info("tdengine server version[{}] dbname[{}]", serverVersion, dbname);
// 版本判断确定使用 SchemaManager 的示例
if (serverVersion.startsWith(Constants.SERVER_VERSION_2)) {
this.schemaManager = new SchemaManager(conn);
} else {
this.schemaManager = new Schema3_0Manager(conn, dbname);
}
// prepare table_name -> table_meta // prepare table_name -> table_meta
this.schemaManager = new SchemaManager(conn);
this.tableMetas = schemaManager.loadTableMeta(tables); this.tableMetas = schemaManager.loadTableMeta(tables);
// prepare table_name -> column_meta // prepare table_name -> column_meta
this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables); this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables);
// filter column // filter column
for (String tableName : tbnameColumnMetasMap.keySet()) { for (String tableName : tbnameColumnMetasMap.keySet()) {
List<ColumnMeta> columnMetaList = tbnameColumnMetasMap.get(tableName); List<ColumnMeta> columnMetaList = tbnameColumnMetasMap.get(tableName);
@ -132,6 +147,20 @@ public class DefaultDataHandler implements DataHandler {
return affectedRows; return affectedRows;
} }
/**
* jdbcUrl 中解析出数据库名称
* @param jdbcUrl 格式是 jdbc:<protocol>://<host>:<port>/<dbname>[?可选参数]
* @return 数据库名称
*/
private static String getDbnameFromJdbcUrl(String jdbcUrl) {
int questionMarkIndex = -1;
if (jdbcUrl.contains("?")) {
questionMarkIndex = jdbcUrl.indexOf("?");
}
return questionMarkIndex == -1 ? jdbcUrl.substring(jdbcUrl.lastIndexOf("/") + 1) :
jdbcUrl.substring(jdbcUrl.lastIndexOf("/") + 1, questionMarkIndex);
}
private int writeEachRow(Connection conn, List<Record> recordBatch) { private int writeEachRow(Connection conn, List<Record> recordBatch) {
int affectedRows = 0; int affectedRows = 0;
for (Record record : recordBatch) { for (Record record : recordBatch) {
@ -465,7 +494,7 @@ public class DefaultDataHandler implements DataHandler {
return column.asString() + "i64"; return column.asString() + "i64";
String value = column.asString(); String value = column.asString();
value = value.replace("\"", "\\\""); value = value.replace("\"", "\\\"");
if (colMeta.type.startsWith("BINARY")) if (colMeta.type.startsWith("BINARY") || colMeta.type.startsWith("VARCHAR"))
return "\"" + value + "\""; return "\"" + value + "\"";
if (colMeta.type.startsWith("NCHAR")) if (colMeta.type.startsWith("NCHAR"))
return "L\"" + value + "\""; return "L\"" + value + "\"";

View File

@ -0,0 +1,166 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.exception.DataXException;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 适配 TDengine 3.0 SchemaManager
*/
public class Schema3_0Manager extends SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(Schema3_0Manager.class);
private final String dbname;
public Schema3_0Manager(Connection conn, String dbname) {
super(conn);
this.dbname = dbname;
}
@Override
public Map<String, TableMeta> loadTableMeta(List<String> tables) throws DataXException {
Map<String, TableMeta> tableMetas = new HashMap<>();
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA
+ Constants.INFORMATION_SCHEMA_TABLE_INS_STABLES + " where db_name = " + getDbnameForSqlQuery());
while (rs.next()) {
TableMeta tableMeta = buildSupTableMeta(rs);
if (!tables.contains(tableMeta.tbname))
continue;
tableMetas.put(tableMeta.tbname, tableMeta);
}
rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA
+ Constants.INFORMATION_SCHEMA_TABLE_INS_TABLES + " where db_name = " + getDbnameForSqlQuery());
while (rs.next()) {
TableMeta tableMeta = buildSubTableMeta(rs);
if (!tables.contains(tableMeta.tbname))
continue;
tableMetas.put(tableMeta.tbname, tableMeta);
}
for (String tbname : tables) {
if (!tableMetas.containsKey(tbname)) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "table metadata of " + tbname + " is empty!");
}
}
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
return tableMetas;
}
private String getDbnameForSqlQuery() {
return "\"" + dbname + "\"";
}
@Override
protected TableMeta buildSupTableMeta(ResultSet rs) throws SQLException {
TableMeta tableMeta = new TableMeta();
tableMeta.tableType = TableType.SUP_TABLE;
tableMeta.tbname = rs.getString(Constants.TABLE_META_SUP_TABLE_NAME);
tableMeta.columns = rs.getInt(Constants.TABLE_META_COLUMNS);
tableMeta.tags = rs.getInt(Constants.TABLE_META_TAGS);
// tableMeta.tables = rs.getInt("tables"); // 直接从 ins_stables 查不到子表数量
LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
return tableMeta;
}
@Override
protected TableMeta buildSubTableMeta(ResultSet rs) throws SQLException {
TableMeta tableMeta = new TableMeta();
String stable_name = rs.getString(Constants.TABLE_META_SUP_TABLE_NAME);
tableMeta.tableType = StringUtils.isBlank(stable_name) ? TableType.NML_TABLE : TableType.SUB_TABLE;
tableMeta.tbname = rs.getString(Constants.TABLE_META_TABLE_NAME);
tableMeta.columns = rs.getInt(Constants.TABLE_META_COLUMNS);
tableMeta.stable_name = StringUtils.isBlank(stable_name) ? null : stable_name;
LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
return tableMeta;
}
@Override
protected ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException {
ColumnMeta columnMeta = new ColumnMeta();
columnMeta.field = rs.getString(Constants.COLUMN_META_FIELD);
columnMeta.type = rs.getString(Constants.COLUMN_META_TYPE);
columnMeta.length = rs.getInt(Constants.COLUMN_META_LENGTH);
columnMeta.note = rs.getString(Constants.COLUMN_META_NOTE);
columnMeta.isTag = Constants.COLUMN_META_NOTE_TAG.equals(columnMeta.note);
// columnMeta.isPrimaryKey = "ts".equals(columnMeta.field);
columnMeta.isPrimaryKey = isPrimaryKey;
return columnMeta;
}
@Override
public TimestampPrecision loadDatabasePrecision() throws DataXException {
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("select * from " + Constants.INFORMATION_SCHEMA + Constants.INFORMATION_SCHEMA_COMMA
+ Constants.INFORMATION_SCHEMA_TABLE_INS_DATABASES + " where name = " + getDbnameForSqlQuery());
while (rs.next()) {
String precision = rs.getString(Constants.DATABASE_META_PRECISION);
switch (precision) {
case "ns":
this.precision = TimestampPrecision.NANOSEC;
break;
case "us":
this.precision = TimestampPrecision.MICROSEC;
break;
case "ms":
default:
this.precision = TimestampPrecision.MILLISEC;
}
}
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
return this.precision;
}
@Override
public Map<String, String> loadTagTableNameMap(String table) throws SQLException {
if (tags2tbnameMaps.containsKey(table))
return tags2tbnameMaps.get(table);
Map<String, String> tags2tbname = new HashMap<>();
try (Statement stmt = conn.createStatement()) {
// describe table
List<String> tags = new ArrayList<>();
ResultSet rs = stmt.executeQuery("describe " + table);
while (rs.next()) {
String note = rs.getString(Constants.COLUMN_META_NOTE);
if (Constants.COLUMN_META_NOTE_TAG.equals(note)) {
tags.add(rs.getString(Constants.COLUMN_META_FIELD));
}
}
// select distinct tbname, t1, t2 from stb
rs = stmt.executeQuery("select distinct " + String.join(",", tags) + ",tbname from " + table);
while (rs.next()) {
ResultSet finalRs = rs;
String tagStr = tags.stream().map(t -> {
try {
return finalRs.getString(t);
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
return "NULL";
}).collect(Collectors.joining(TAG_TABLE_NAME_MAP_KEY_SPLITTER));
String tbname = rs.getString("tbname");
tags2tbname.put(tagStr, tbname);
}
}
tags2tbnameMaps.put(table, tags2tbname);
return tags2tbname;
}
}

View File

@ -12,11 +12,11 @@ import java.util.stream.Collectors;
public class SchemaManager { public class SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class); private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
// private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "_"; // private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "_";
private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = ""; protected static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "";
private final Connection conn; protected final Connection conn;
private TimestampPrecision precision; protected TimestampPrecision precision;
private Map<String, Map<String, String>> tags2tbnameMaps = new HashMap<>(); protected Map<String, Map<String, String>> tags2tbnameMaps = new HashMap<>();
public SchemaManager(Connection conn) { public SchemaManager(Connection conn) {
this.conn = conn; this.conn = conn;
@ -135,7 +135,7 @@ public class SchemaManager {
return ret; return ret;
} }
private TableMeta buildSupTableMeta(ResultSet rs) throws SQLException { protected TableMeta buildSupTableMeta(ResultSet rs) throws SQLException {
TableMeta tableMeta = new TableMeta(); TableMeta tableMeta = new TableMeta();
tableMeta.tableType = TableType.SUP_TABLE; tableMeta.tableType = TableType.SUP_TABLE;
tableMeta.tbname = rs.getString("name"); tableMeta.tbname = rs.getString("name");
@ -147,7 +147,7 @@ public class SchemaManager {
return tableMeta; return tableMeta;
} }
private TableMeta buildSubTableMeta(ResultSet rs) throws SQLException { protected TableMeta buildSubTableMeta(ResultSet rs) throws SQLException {
TableMeta tableMeta = new TableMeta(); TableMeta tableMeta = new TableMeta();
String stable_name = rs.getString("stable_name"); String stable_name = rs.getString("stable_name");
tableMeta.tableType = StringUtils.isBlank(stable_name) ? TableType.NML_TABLE : TableType.SUB_TABLE; tableMeta.tableType = StringUtils.isBlank(stable_name) ? TableType.NML_TABLE : TableType.SUB_TABLE;
@ -159,7 +159,7 @@ public class SchemaManager {
return tableMeta; return tableMeta;
} }
private ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException { protected ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException {
ColumnMeta columnMeta = new ColumnMeta(); ColumnMeta columnMeta = new ColumnMeta();
columnMeta.field = rs.getString("Field"); columnMeta.field = rs.getString("Field");
columnMeta.type = rs.getString("Type"); columnMeta.type = rs.getString("Type");