From 8d36ffc32f5ff4e189009d8577f009f64376ab03 Mon Sep 17 00:00:00 2001 From: zyyang Date: Sun, 14 Apr 2024 21:11:28 +0800 Subject: [PATCH] fix: add Schema Cache singleton#TD-29324 --- tdengine30reader/pom.xml | 2 +- tdengine30writer/pom.xml | 2 +- .../tdengine30writer/DefaultDataHandler.java | 240 +++++++++--------- .../writer/tdengine30writer/SchemaCache.java | 138 ++++++++++ .../tdengine30writer/TDengineWriter.java | 56 ++-- .../DefaultDataHandlerTest.java | 100 +++----- 6 files changed, 329 insertions(+), 209 deletions(-) create mode 100644 tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java diff --git a/tdengine30reader/pom.xml b/tdengine30reader/pom.xml index 2a82fc2b..5754fb2d 100644 --- a/tdengine30reader/pom.xml +++ b/tdengine30reader/pom.xml @@ -37,7 +37,7 @@ com.taosdata.jdbc taos-jdbcdriver - 3.2.5 + 3.2.9 junit diff --git a/tdengine30writer/pom.xml b/tdengine30writer/pom.xml index 94ef94f9..4b71a1b5 100644 --- a/tdengine30writer/pom.xml +++ b/tdengine30writer/pom.xml @@ -22,7 +22,7 @@ com.taosdata.jdbc taos-jdbcdriver - 3.2.5 + 3.2.9 org.apache.commons diff --git a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandler.java b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandler.java index 010795ef..3d02df80 100644 --- a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandler.java +++ b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandler.java @@ -33,32 +33,25 @@ public class DefaultDataHandler implements DataHandler { } private final TaskPluginCollector taskPluginCollector; - private String username; - private String password; - private String jdbcUrl; - private int batchSize; - private boolean ignoreTagsUnmatched; + private final String username; + private final String password; + private final String jdbcUrl; + private final int batchSize; + private final boolean ignoreTagsUnmatched; - private List tables; - private List columns; + private final List tables; + private final List columns; - private Map tableMetas; private SchemaManager schemaManager; + private final SchemaCache schemaCache; - public void setTableMetas(Map tableMetas) { - this.tableMetas = tableMetas; - } - - public void setTbnameColumnMetasMap(Map> tbnameColumnMetasMap) { - this.tbnameColumnMetasMap = tbnameColumnMetasMap; - } + //private Map tableMetas; + private Map> tbnameColumnMetasMap; public void setSchemaManager(SchemaManager schemaManager) { this.schemaManager = schemaManager; } - private Map> tbnameColumnMetasMap; - public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) { this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME); this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD); @@ -66,8 +59,11 @@ public class DefaultDataHandler implements DataHandler { this.batchSize = configuration.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE); this.tables = configuration.getList(Key.TABLE, String.class); this.columns = configuration.getList(Key.COLUMN, String.class); - this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED, Constants.DEFAULT_IGNORE_TAGS_UNMATCHED); + this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED, + Constants.DEFAULT_IGNORE_TAGS_UNMATCHED); this.taskPluginCollector = taskPluginCollector; + + this.schemaCache = SchemaCache.getInstance(configuration); } @Override @@ -77,14 +73,15 @@ public class DefaultDataHandler implements DataHandler { try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) { LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established."); - String dbname = getDbnameFromJdbcUrl(jdbcUrl); + String dbname = TDengineWriter.parseDatabaseFromJdbcUrl(jdbcUrl); Statement statement = conn.createStatement(); ResultSet resultSet = statement.executeQuery("select " + Constants.SERVER_VERSION); // 光标移动一行 resultSet.next(); String serverVersion = resultSet.getString(Constants.SERVER_VERSION); if (StringUtils.isEmpty(dbname)) { - throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "error dbname parsed from jdbcUrl"); + throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, + "error dbname parsed from jdbcUrl"); } LOG.info("tdengine server version[{}] dbname[{}]", serverVersion, dbname); // 版本判断确定使用 SchemaManager 的示例 @@ -93,21 +90,6 @@ public class DefaultDataHandler implements DataHandler { } else { this.schemaManager = new Schema3_0Manager(conn, dbname); } - // prepare table_name -> table_meta - this.tableMetas = schemaManager.loadTableMeta(tables); - // prepare table_name -> column_meta - this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables); - // filter column - for (String tableName : tbnameColumnMetasMap.keySet()) { - List columnMetaList = tbnameColumnMetasMap.get(tableName); - Iterator iterator = columnMetaList.iterator(); - while (iterator.hasNext()) { - ColumnMeta columnMeta = iterator.next(); - if (!this.columns.contains(columnMeta.field)) { - iterator.remove(); - } - } - } List recordBatch = new ArrayList<>(); Record record; @@ -141,26 +123,13 @@ public class DefaultDataHandler implements DataHandler { } if (affectedRows != count) { - LOG.error("write record missing or incorrect happened, affectedRows: " + affectedRows + ", total: " + count); + LOG.error( + "write record missing or incorrect happened, affectedRows: " + affectedRows + ", total: " + count); } return affectedRows; } - /** - * 从 jdbcUrl 中解析出数据库名称 - * @param jdbcUrl 格式是 jdbc:://:/[?可选参数] - * @return 数据库名称 - */ - private static String getDbnameFromJdbcUrl(String jdbcUrl) { - int questionMarkIndex = -1; - if (jdbcUrl.contains("?")) { - questionMarkIndex = jdbcUrl.indexOf("?"); - } - return questionMarkIndex == -1 ? jdbcUrl.substring(jdbcUrl.lastIndexOf("/") + 1) : - jdbcUrl.substring(jdbcUrl.lastIndexOf("/") + 1, questionMarkIndex); - } - private int writeEachRow(Connection conn, List recordBatch) { int affectedRows = 0; for (Record record : recordBatch) { @@ -190,7 +159,7 @@ public class DefaultDataHandler implements DataHandler { public int writeBatch(Connection conn, List recordBatch) throws SQLException { int affectedRows = 0; for (String table : tables) { - TableMeta tableMeta = tableMetas.get(table); + TableMeta tableMeta = this.schemaCache.getTableMeta(table); switch (tableMeta.tableType) { case SUP_TABLE: { if (columns.contains("tbname")) { @@ -212,17 +181,15 @@ public class DefaultDataHandler implements DataHandler { return affectedRows; } - private int writeBatchToSupTableWithoutTbname(Connection conn, String table, List recordBatch, Map tag2Tbname) throws SQLException { - List columnMetas = tbnameColumnMetasMap.get(table); + private int writeBatchToSupTableWithoutTbname(Connection conn, String table, List recordBatch, + Map tag2Tbname) throws SQLException { + List columnMetas = schemaCache.getColumnMetaList(table); List subTableExist = filterSubTableExistRecords(recordBatch, columnMetas, tag2Tbname); List subTableNotExist = filterSubTableNotExistRecords(recordBatch, columnMetas, tag2Tbname); int affectedRows = 0; Map> subTableRecordsMap = splitRecords(subTableExist, columnMetas, tag2Tbname); - List subTables = new ArrayList<>(subTableRecordsMap.keySet()); - this.tbnameColumnMetasMap.putAll(schemaManager.loadColumnMetas(subTables)); - for (String subTable : subTableRecordsMap.keySet()) { List subTableRecords = subTableRecordsMap.get(subTable); affectedRows += writeBatchToNormalTable(conn, subTable, subTableRecords); @@ -232,21 +199,24 @@ public class DefaultDataHandler implements DataHandler { return affectedRows; } - private List filterSubTableExistRecords(List recordBatch, List columnMetas, Map tag2Tbname) { + private List filterSubTableExistRecords(List recordBatch, List columnMetas, + Map tag2Tbname) { return recordBatch.stream().filter(record -> { String tagStr = getTagString(columnMetas, record); return tag2Tbname.containsKey(tagStr); }).collect(Collectors.toList()); } - private List filterSubTableNotExistRecords(List recordBatch, List columnMetas, Map tag2Tbname) { + private List filterSubTableNotExistRecords(List recordBatch, List columnMetas, + Map tag2Tbname) { return recordBatch.stream().filter(record -> { String tagStr = getTagString(columnMetas, record); return !tag2Tbname.containsKey(tagStr); }).collect(Collectors.toList()); } - private Map> splitRecords(List subTableExist, List columnMetas, Map tag2Tbname) { + private Map> splitRecords(List subTableExist, List columnMetas, + Map tag2Tbname) { Map> ret = new HashMap<>(); for (Record record : subTableExist) { String tagstr = getTagString(columnMetas, record); @@ -278,7 +248,9 @@ public class DefaultDataHandler implements DataHandler { return column.asString(); } } catch (Exception e) { - LOG.error("failed to get Tag, colIndex: " + colIndex + ", ColumnMeta: " + columnMeta + ", record: " + record, e); + LOG.error( + "failed to get Tag, colIndex: " + colIndex + ", ColumnMeta: " + columnMeta + ", record: " + + record, e); } } return ""; @@ -291,30 +263,32 @@ public class DefaultDataHandler implements DataHandler { * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) */ private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) throws SQLException { - List columnMetas = this.tbnameColumnMetasMap.get(table); + List columnMetas = this.schemaCache.getColumnMetaList(table); StringBuilder sb = new StringBuilder("insert into"); for (Record record : recordBatch) { - sb.append(" `").append(record.getColumn(indexOf("tbname")).asString()) - .append("` using ").append(table) - .append(" tags") - .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { - return colMeta.isTag; - }).map(colMeta -> { - return buildColumnValue(colMeta, record); - }).collect(Collectors.joining(",", "(", ")"))) - .append(" ") - .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { - return !colMeta.isTag; - }).map(colMeta -> { - return colMeta.field; - }).collect(Collectors.joining(",", "(", ")"))) - .append(" values") - .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { - return !colMeta.isTag; - }).map(colMeta -> { - return buildColumnValue(colMeta, record); - }).collect(Collectors.joining(",", "(", ")"))); + sb.append(" `") + .append(record.getColumn(indexOf("tbname")).asString()) + .append("` using ") + .append(table) + .append(" tags") + .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { + return colMeta.isTag; + }).map(colMeta -> { + return buildColumnValue(colMeta, record); + }).collect(Collectors.joining(",", "(", ")"))) + .append(" ") + .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { + return !colMeta.isTag; + }).map(colMeta -> { + return colMeta.field; + }).collect(Collectors.joining(",", "(", ")"))) + .append(" values") + .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { + return !colMeta.isTag; + }).map(colMeta -> { + return buildColumnValue(colMeta, record); + }).collect(Collectors.joining(",", "(", ")"))); } String sql = sb.toString(); @@ -371,33 +345,35 @@ public class DefaultDataHandler implements DataHandler { * table: ["stb1"], column: ["ts", "f1", "f2", "t1"] * data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts */ - private int writeBatchToSupTableBySchemaless(Connection conn, String table, List recordBatch) throws SQLException { + private int writeBatchToSupTableBySchemaless(Connection conn, String table, + List recordBatch) throws SQLException { int count = 0; TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision(); - List columnMetaList = this.tbnameColumnMetasMap.get(table); + List columnMetaList = this.schemaCache.getColumnMetaList(table); ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get(); List lines = new ArrayList<>(); for (Record record : recordBatch) { StringBuilder sb = new StringBuilder(); - sb.append(table).append(",") - .append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { - return colMeta.isTag; - }).map(colMeta -> { - String value = record.getColumn(indexOf(colMeta.field)).asString(); - if (value.contains(" ")) - value = value.replace(" ", "\\ "); - return colMeta.field + "=" + value; - }).collect(Collectors.joining(","))) - .append(" ") - .append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { - return !colMeta.isTag && !colMeta.isPrimaryKey; - }).map(colMeta -> { - return colMeta.field + "=" + buildSchemalessColumnValue(colMeta, record); -// return colMeta.field + "=" + record.getColumn(indexOf(colMeta.field)).asString(); - }).collect(Collectors.joining(","))) - .append(" "); + sb.append(table) + .append(",") + .append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { + return colMeta.isTag; + }).map(colMeta -> { + String value = record.getColumn(indexOf(colMeta.field)).asString(); + if (value.contains(" ")) + value = value.replace(" ", "\\ "); + return colMeta.field + "=" + value; + }).collect(Collectors.joining(","))) + .append(" ") + .append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { + return !colMeta.isTag && !colMeta.isPrimaryKey; + }).map(colMeta -> { + return colMeta.field + "=" + buildSchemalessColumnValue(colMeta, record); + // return colMeta.field + "=" + record.getColumn(indexOf(colMeta.field)).asString(); + }).collect(Collectors.joining(","))) + .append(" "); // timestamp Column column = record.getColumn(indexOf(ts.field)); Object tsValue = column.getRawData(); @@ -512,28 +488,33 @@ public class DefaultDataHandler implements DataHandler { * insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)]) */ private int writeBatchToSubTable(Connection conn, String table, List recordBatch) throws SQLException { - List columnMetas = this.tbnameColumnMetasMap.get(table); + List columnMetas = this.schemaCache.getColumnMetaList(table); StringBuilder sb = new StringBuilder(); - sb.append("insert into `").append(table).append("` ") - .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { - return !colMeta.isTag; - }).map(colMeta -> { - return colMeta.field; - }).collect(Collectors.joining(",", "(", ")"))) - .append(" values"); + sb.append("insert into `") + .append(table) + .append("` ") + .append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { + return !colMeta.isTag; + }).map(colMeta -> { + return colMeta.field; + }).collect(Collectors.joining(",", "(", ")"))) + .append(" values"); int validRecords = 0; for (Record record : recordBatch) { if (columns.contains("tbname") && !table.equals(record.getColumn(indexOf("tbname")).asString())) continue; - boolean tagsAllMatch = columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> { - return colMeta.isTag; - }).allMatch(colMeta -> { - Column column = record.getColumn(indexOf(colMeta.field)); - boolean equals = equals(column, colMeta); - return equals; - }); + boolean tagsAllMatch = columnMetas.stream() + .filter(colMeta -> columns.contains(colMeta.field)) + .filter(colMeta -> { + return colMeta.isTag; + }) + .allMatch(colMeta -> { + Column column = record.getColumn(indexOf(colMeta.field)); + boolean equals = equals(column, colMeta); + return equals; + }); if (ignoreTagsUnmatched && !tagsAllMatch) continue; @@ -582,20 +563,29 @@ public class DefaultDataHandler implements DataHandler { * sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...) */ private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) throws SQLException { - List columnMetas = this.tbnameColumnMetasMap.get(table); + List columnMetas = this.schemaCache.getColumnMetaList(table); StringBuilder sb = new StringBuilder(); - sb.append("insert into `").append(table) - .append("` ") - .append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> { - return colMeta.field; - }).collect(Collectors.joining(",", "(", ")"))) - .append(" values "); + sb.append("insert into `") + .append(table) + .append("` ") + .append(columnMetas.stream() + .filter(colMeta -> !colMeta.isTag) + .filter(colMeta -> columns.contains(colMeta.field)) + .map(colMeta -> { + return colMeta.field; + }) + .collect(Collectors.joining(",", "(", ")"))) + .append(" values "); for (Record record : recordBatch) { - sb.append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> { - return buildColumnValue(colMeta, record); - }).collect(Collectors.joining(",", "(", ")"))); + sb.append(columnMetas.stream() + .filter(colMeta -> !colMeta.isTag) + .filter(colMeta -> columns.contains(colMeta.field)) + .map(colMeta -> { + return buildColumnValue(colMeta, record); + }) + .collect(Collectors.joining(",", "(", ")"))); } String sql = sb.toString(); diff --git a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java new file mode 100644 index 00000000..ebcd623a --- /dev/null +++ b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java @@ -0,0 +1,138 @@ +package com.alibaba.datax.plugin.writer.tdengine30writer; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.ArrayList; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.function.Predicate; + +/** + * Schema cache for TDengine 3.X + */ +public final class SchemaCache { + private static final Logger log = LoggerFactory.getLogger(TDengineWriter.Job.class); + + private static volatile SchemaCache instance; + + private static Configuration config; + private static Connection conn; + // table name -> TableMeta + private static final Map tableMetas = new LinkedHashMap<>(); + // table name ->List + private static final Map> columnMetas = new LinkedHashMap<>(); + + private SchemaCache(Configuration config) { + SchemaCache.config = config; + + // connect + final String user = config.getString(Key.USERNAME, Constants.DEFAULT_USERNAME); + final String pass = config.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD); + + Configuration conn = Configuration.from(config.getList(Key.CONNECTION).get(0).toString()); + + final String url = conn.getString(Key.JDBC_URL); + try { + SchemaCache.conn = DriverManager.getConnection(url, user, pass); + } catch (SQLException e) { + throw DataXException.asDataXException( + "failed to connect to url: " + url + ", cause: {" + e.getMessage() + "}"); + } + + final String dbname = TDengineWriter.parseDatabaseFromJdbcUrl(url); + SchemaManager schemaManager = new Schema3_0Manager(SchemaCache.conn, dbname); + + // init table meta cache and load + final List tables = conn.getList(Key.TABLE, String.class); + Map tableMetas = schemaManager.loadTableMeta(tables); + + // init column meta cache + SchemaCache.tableMetas.putAll(tableMetas); + for (String table : tableMetas.keySet()) { + SchemaCache.columnMetas.put(table, new ArrayList<>()); + } + } + + public static SchemaCache getInstance(Configuration originConfig) { + if (instance == null) { + synchronized (SchemaCache.class) { + if (instance == null) { + instance = new SchemaCache(originConfig); + } + } + } + return instance; + } + + public TableMeta getTableMeta(String table_name) { + return tableMetas.get(table_name); + } + + public List getColumnMetaList(String tbname) { + if (columnMetas.get(tbname).isEmpty()) { + synchronized (SchemaCache.class) { + if (columnMetas.get(tbname).isEmpty()) { + List column_name = config.getList(Key.COLUMN, String.class); + List colMetaList = getColumnMetaListFromDb(tbname, + (colMeta) -> column_name.contains(colMeta.field)); + columnMetas.get(tbname).addAll(colMetaList); + } + } + } + return columnMetas.get(tbname); + } + + private List getColumnMetaListFromDb(String tableName, Predicate filter) { + List columnMetaList = columnMetas.get(tableName); + + try (Statement stmt = conn.createStatement()) { + ResultSet rs = stmt.executeQuery("describe " + tableName); + for (int i = 0; rs.next(); i++) { + ColumnMeta columnMeta = buildColumnMeta(rs, i == 0); + + if (filter.test(columnMeta)) + columnMetaList.add(columnMeta); + } + } catch (SQLException e) { + throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage()); + } + + for (ColumnMeta colMeta : columnMetaList) { + if (!colMeta.isTag) + continue; + Object tagValue = getTagValue(tableName, colMeta.field); + colMeta.value = tagValue; + } + + return columnMetaList; + } + + private Object getTagValue(String tableName, String tagName) { + String sql = "select " + tagName + " from " + tableName; + try (Statement stmt = conn.createStatement()) { + ResultSet rs = stmt.executeQuery(sql); + rs.next(); + return rs.getObject(tagName); + } catch (SQLException e) { + throw DataXException.asDataXException("failed to get tag value, cause: {" + e.getMessage() + "}"); + } + } + + private ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException { + ColumnMeta columnMeta = new ColumnMeta(); + columnMeta.field = rs.getString(Constants.COLUMN_META_FIELD); + columnMeta.type = rs.getString(Constants.COLUMN_META_TYPE); + columnMeta.length = rs.getInt(Constants.COLUMN_META_LENGTH); + columnMeta.note = rs.getString(Constants.COLUMN_META_NOTE); + columnMeta.isTag = Constants.COLUMN_META_NOTE_TAG.equals(columnMeta.note); + // columnMeta.isPrimaryKey = "ts".equals(columnMeta.field); + columnMeta.isPrimaryKey = isPrimaryKey; + return columnMeta; + } + +} diff --git a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/TDengineWriter.java b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/TDengineWriter.java index d0dde32b..3a128e7c 100644 --- a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/TDengineWriter.java +++ b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/TDengineWriter.java @@ -13,7 +13,6 @@ import java.util.ArrayList; import java.util.List; public class TDengineWriter extends Writer { - private static final String PEER_PLUGIN_NAME = "peerPluginName"; public static class Job extends Writer.Job { @@ -29,29 +28,28 @@ public class TDengineWriter extends Writer { // check username String user = this.originalConfig.getString(Key.USERNAME); if (StringUtils.isBlank(user)) - throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter [" - + Key.USERNAME + "] is not set."); + throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.USERNAME + "] is not set."); // check password String password = this.originalConfig.getString(Key.PASSWORD); if (StringUtils.isBlank(password)) - throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter [" - + Key.PASSWORD + "] is not set."); + throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.PASSWORD + "] is not set."); // check connection List connection = this.originalConfig.getList(Key.CONNECTION); if (connection == null || connection.isEmpty()) - throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter [" - + Key.CONNECTION + "] is not set."); + throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.CONNECTION + "] is not set."); if (connection.size() > 1) LOG.warn("connection.size is " + connection.size() + " and only connection[0] will be used."); Configuration conn = Configuration.from(connection.get(0).toString()); String jdbcUrl = conn.getString(Key.JDBC_URL); if (StringUtils.isBlank(jdbcUrl)) - throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter [" - + Key.JDBC_URL + "] of connection is not set."); + throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.JDBC_URL + "] of connection is not set."); - // check column } @Override @@ -63,14 +61,17 @@ public class TDengineWriter extends Writer { public List split(int mandatoryNumber) { List writerSplitConfigs = new ArrayList<>(); - List conns = this.originalConfig.getList(Key.CONNECTION); for (int i = 0; i < mandatoryNumber; i++) { Configuration clone = this.originalConfig.clone(); - Configuration conf = Configuration.from(conns.get(0).toString()); - String jdbcUrl = conf.getString(Key.JDBC_URL); + + Configuration config = Configuration.from( + this.originalConfig.getList(Key.CONNECTION).get(0).toString()); + + String jdbcUrl = config.getString(Key.JDBC_URL); clone.set(Key.JDBC_URL, jdbcUrl); - clone.set(Key.TABLE, conf.getList(Key.TABLE)); - clone.remove(Key.CONNECTION); + + clone.set(Key.TABLE, config.getList(Key.TABLE)); + writerSplitConfigs.add(clone); } @@ -81,12 +82,12 @@ public class TDengineWriter extends Writer { public static class Task extends Writer.Task { private static final Logger LOG = LoggerFactory.getLogger(Task.class); - private Configuration writerSliceConfig; + private Configuration writerConfig; private TaskPluginCollector taskPluginCollector; @Override public void init() { - this.writerSliceConfig = getPluginJobConf(); + this.writerConfig = getPluginJobConf(); this.taskPluginCollector = super.getTaskPluginCollector(); } @@ -97,18 +98,33 @@ public class TDengineWriter extends Writer { @Override public void startWrite(RecordReceiver lineReceiver) { - String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME); + String peerPluginName = this.writerConfig.getString(PEER_PLUGIN_NAME); LOG.debug("start to handle record from: " + peerPluginName); DataHandler handler; if (peerPluginName.equals("opentsdbreader")) - handler = new OpentsdbDataHandler(this.writerSliceConfig); + handler = new OpentsdbDataHandler(this.writerConfig); else - handler = new DefaultDataHandler(this.writerSliceConfig, this.taskPluginCollector); + handler = new DefaultDataHandler(this.writerConfig, this.taskPluginCollector); long records = handler.handle(lineReceiver, getTaskPluginCollector()); LOG.debug("handle data finished, records: " + records); } } + + /** + * 从 jdbcUrl 中解析出数据库名称 + * + * @param jdbcUrl 格式是 jdbc:://:/[?可选参数] + * @return 数据库名称 + */ + public static String parseDatabaseFromJdbcUrl(String jdbcUrl) { + int questionMarkIndex = -1; + if (jdbcUrl.contains("?")) { + questionMarkIndex = jdbcUrl.indexOf("?"); + } + return questionMarkIndex == -1 ? jdbcUrl.substring(jdbcUrl.lastIndexOf("/") + 1) : jdbcUrl.substring( + jdbcUrl.lastIndexOf("/") + 1, questionMarkIndex); + } } diff --git a/tdengine30writer/src/test/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandlerTest.java b/tdengine30writer/src/test/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandlerTest.java index 99609403..41164125 100644 --- a/tdengine30writer/src/test/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandlerTest.java +++ b/tdengine30writer/src/test/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandlerTest.java @@ -33,14 +33,10 @@ public class DefaultDataHandlerTest { public void writeSupTableBySQL() throws SQLException { // given createSupAndSubTable(); - Configuration configuration = Configuration.from("{" + - "\"username\": \"root\"," + - "\"password\": \"taosdata\"," + - "\"column\": [\"tbname\", \"ts\", \"f1\", \"f2\", \"t1\"]," + - "\"table\":[\"stb1\"]," + - "\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + - "\"batchSize\": \"1000\"" + - "}"); + Configuration configuration = Configuration.from( + "{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," + + "\"column\": [\"tbname\", \"ts\", \"f1\", \"f2\", \"t1\"]," + "\"table\":[\"stb1\"]," + + "\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + "\"batchSize\": \"1000\"" + "}"); long current = System.currentTimeMillis(); List recordList = IntStream.range(1, 11).mapToObj(i -> { Record record = new DefaultRecord(); @@ -59,8 +55,8 @@ public class DefaultDataHandlerTest { SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); Map> columnMetas = schemaManager.loadColumnMetas(tables); - handler.setTableMetas(tableMetas); - handler.setTbnameColumnMetasMap(columnMetas); + //handler.setTableMetas(tableMetas); + //handler.setTbnameColumnMetasMap(columnMetas); handler.setSchemaManager(schemaManager); int count = handler.writeBatch(conn, recordList); @@ -73,14 +69,10 @@ public class DefaultDataHandlerTest { public void writeSupTableBySQL_2() throws SQLException { // given createSupAndSubTable(); - Configuration configuration = Configuration.from("{" + - "\"username\": \"root\"," + - "\"password\": \"taosdata\"," + - "\"column\": [\"tbname\", \"ts\", \"f1\", \"t1\"]," + - "\"table\":[\"stb1\"]," + - "\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + - "\"batchSize\": \"1000\"" + - "}"); + Configuration configuration = Configuration.from( + "{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," + + "\"column\": [\"tbname\", \"ts\", \"f1\", \"t1\"]," + "\"table\":[\"stb1\"]," + + "\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + "\"batchSize\": \"1000\"" + "}"); long current = System.currentTimeMillis(); List recordList = IntStream.range(1, 11).mapToObj(i -> { Record record = new DefaultRecord(); @@ -97,8 +89,8 @@ public class DefaultDataHandlerTest { SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); Map> columnMetas = schemaManager.loadColumnMetas(tables); - handler.setTableMetas(tableMetas); - handler.setTbnameColumnMetasMap(columnMetas); + //handler.setTableMetas(tableMetas); + //handler.setTbnameColumnMetasMap(columnMetas); handler.setSchemaManager(schemaManager); int count = handler.writeBatch(conn, recordList); @@ -111,14 +103,10 @@ public class DefaultDataHandlerTest { public void writeSupTableBySchemaless() throws SQLException { // given createSupTable(); - Configuration configuration = Configuration.from("{" + - "\"username\": \"root\"," + - "\"password\": \"taosdata\"," + - "\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," + - "\"table\":[\"stb1\"]," + - "\"jdbcUrl\":\"jdbc:TAOS://" + host + ":6030/scm_test\"," + - "\"batchSize\": \"1000\"" + - "}"); + Configuration configuration = Configuration.from( + "{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," + + "\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," + "\"table\":[\"stb1\"]," + + "\"jdbcUrl\":\"jdbc:TAOS://" + host + ":6030/scm_test\"," + "\"batchSize\": \"1000\"" + "}"); String jdbcUrl = configuration.getString("jdbcUrl"); Connection connection = DriverManager.getConnection(jdbcUrl, "root", "taosdata"); long current = System.currentTimeMillis(); @@ -137,8 +125,8 @@ public class DefaultDataHandlerTest { SchemaManager schemaManager = new SchemaManager(connection); Map tableMetas = schemaManager.loadTableMeta(tables); Map> columnMetas = schemaManager.loadColumnMetas(tables); - handler.setTableMetas(tableMetas); - handler.setTbnameColumnMetasMap(columnMetas); + //handler.setTableMetas(tableMetas); + //handler.setTbnameColumnMetasMap(columnMetas); handler.setSchemaManager(schemaManager); int count = handler.writeBatch(connection, recordList); @@ -151,14 +139,10 @@ public class DefaultDataHandlerTest { public void writeSubTableWithTableName() throws SQLException { // given createSupAndSubTable(); - Configuration configuration = Configuration.from("{" + - "\"username\": \"root\"," + - "\"password\": \"taosdata\"," + - "\"column\": [\"tbname\", \"ts\", \"f1\", \"f2\", \"t1\"]," + - "\"table\":[\"tb1\"]," + - "\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + - "\"batchSize\": \"1000\"" + - "}"); + Configuration configuration = Configuration.from( + "{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," + + "\"column\": [\"tbname\", \"ts\", \"f1\", \"f2\", \"t1\"]," + "\"table\":[\"tb1\"]," + + "\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + "\"batchSize\": \"1000\"" + "}"); long current = System.currentTimeMillis(); List recordList = IntStream.range(1, 11).mapToObj(i -> { Record record = new DefaultRecord(); @@ -176,8 +160,8 @@ public class DefaultDataHandlerTest { SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); Map> columnMetas = schemaManager.loadColumnMetas(tables); - handler.setTableMetas(tableMetas); - handler.setTbnameColumnMetasMap(columnMetas); + //handler.setTableMetas(tableMetas); + //handler.setTbnameColumnMetasMap(columnMetas); handler.setSchemaManager(schemaManager); int count = handler.writeBatch(conn, recordList); @@ -190,15 +174,11 @@ public class DefaultDataHandlerTest { public void writeSubTableWithoutTableName() throws SQLException { // given createSupAndSubTable(); - Configuration configuration = Configuration.from("{" + - "\"username\": \"root\"," + - "\"password\": \"taosdata\"," + - "\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," + - "\"table\":[\"tb1\"]," + - "\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + - "\"batchSize\": \"1000\"," + - "\"ignoreTagsUnmatched\": \"true\"" + - "}"); + Configuration configuration = Configuration.from( + "{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," + + "\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," + "\"table\":[\"tb1\"]," + + "\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + "\"batchSize\": \"1000\"," + + "\"ignoreTagsUnmatched\": \"true\"" + "}"); long current = System.currentTimeMillis(); List recordList = IntStream.range(1, 11).mapToObj(i -> { Record record = new DefaultRecord(); @@ -215,8 +195,8 @@ public class DefaultDataHandlerTest { SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); Map> columnMetas = schemaManager.loadColumnMetas(tables); - handler.setTableMetas(tableMetas); - handler.setTbnameColumnMetasMap(columnMetas); + //handler.setTableMetas(tableMetas); + //handler.setTbnameColumnMetasMap(columnMetas); handler.setSchemaManager(schemaManager); int count = handler.writeBatch(conn, recordList); @@ -229,15 +209,11 @@ public class DefaultDataHandlerTest { public void writeNormalTable() throws SQLException { // given createSupAndSubTable(); - Configuration configuration = Configuration.from("{" + - "\"username\": \"root\"," + - "\"password\": \"taosdata\"," + - "\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," + - "\"table\":[\"weather\"]," + - "\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + - "\"batchSize\": \"1000\"," + - "\"ignoreTagsUnmatched\": \"true\"" + - "}"); + Configuration configuration = Configuration.from( + "{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," + + "\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," + "\"table\":[\"weather\"]," + + "\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + "\"batchSize\": \"1000\"," + + "\"ignoreTagsUnmatched\": \"true\"" + "}"); long current = System.currentTimeMillis(); List recordList = IntStream.range(1, 11).mapToObj(i -> { Record record = new DefaultRecord(); @@ -254,8 +230,8 @@ public class DefaultDataHandlerTest { SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); Map> columnMetas = schemaManager.loadColumnMetas(tables); - handler.setTableMetas(tableMetas); - handler.setTbnameColumnMetasMap(columnMetas); + //handler.setTableMetas(tableMetas); + //handler.setTbnameColumnMetasMap(columnMetas); handler.setSchemaManager(schemaManager); int count = handler.writeBatch(conn, recordList);