fix: add Schema Cache singleton#TD-29324

This commit is contained in:
zyyang 2024-04-14 21:11:28 +08:00
parent 12fe8776a2
commit 8d36ffc32f
6 changed files with 329 additions and 209 deletions

View File

@ -37,7 +37,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.5</version>
<version>3.2.9</version>
</dependency>
<dependency>
<groupId>junit</groupId>

View File

@ -22,7 +22,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>3.2.5</version>
<version>3.2.9</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>

View File

@ -33,32 +33,25 @@ public class DefaultDataHandler implements DataHandler {
}
private final TaskPluginCollector taskPluginCollector;
private String username;
private String password;
private String jdbcUrl;
private int batchSize;
private boolean ignoreTagsUnmatched;
private final String username;
private final String password;
private final String jdbcUrl;
private final int batchSize;
private final boolean ignoreTagsUnmatched;
private List<String> tables;
private List<String> columns;
private final List<String> tables;
private final List<String> columns;
private Map<String, TableMeta> tableMetas;
private SchemaManager schemaManager;
private final SchemaCache schemaCache;
public void setTableMetas(Map<String, TableMeta> tableMetas) {
this.tableMetas = tableMetas;
}
public void setTbnameColumnMetasMap(Map<String, List<ColumnMeta>> tbnameColumnMetasMap) {
this.tbnameColumnMetasMap = tbnameColumnMetasMap;
}
//private Map<String, TableMeta> tableMetas;
private Map<String, List<ColumnMeta>> tbnameColumnMetasMap;
public void setSchemaManager(SchemaManager schemaManager) {
this.schemaManager = schemaManager;
}
private Map<String, List<ColumnMeta>> tbnameColumnMetasMap;
public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) {
this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD);
@ -66,8 +59,11 @@ public class DefaultDataHandler implements DataHandler {
this.batchSize = configuration.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE);
this.tables = configuration.getList(Key.TABLE, String.class);
this.columns = configuration.getList(Key.COLUMN, String.class);
this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED, Constants.DEFAULT_IGNORE_TAGS_UNMATCHED);
this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED,
Constants.DEFAULT_IGNORE_TAGS_UNMATCHED);
this.taskPluginCollector = taskPluginCollector;
this.schemaCache = SchemaCache.getInstance(configuration);
}
@Override
@ -77,14 +73,15 @@ public class DefaultDataHandler implements DataHandler {
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
String dbname = getDbnameFromJdbcUrl(jdbcUrl);
String dbname = TDengineWriter.parseDatabaseFromJdbcUrl(jdbcUrl);
Statement statement = conn.createStatement();
ResultSet resultSet = statement.executeQuery("select " + Constants.SERVER_VERSION);
// 光标移动一行
resultSet.next();
String serverVersion = resultSet.getString(Constants.SERVER_VERSION);
if (StringUtils.isEmpty(dbname)) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "error dbname parsed from jdbcUrl");
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
"error dbname parsed from jdbcUrl");
}
LOG.info("tdengine server version[{}] dbname[{}]", serverVersion, dbname);
// 版本判断确定使用 SchemaManager 的示例
@ -93,21 +90,6 @@ public class DefaultDataHandler implements DataHandler {
} else {
this.schemaManager = new Schema3_0Manager(conn, dbname);
}
// prepare table_name -> table_meta
this.tableMetas = schemaManager.loadTableMeta(tables);
// prepare table_name -> column_meta
this.tbnameColumnMetasMap = schemaManager.loadColumnMetas(tables);
// filter column
for (String tableName : tbnameColumnMetasMap.keySet()) {
List<ColumnMeta> columnMetaList = tbnameColumnMetasMap.get(tableName);
Iterator<ColumnMeta> iterator = columnMetaList.iterator();
while (iterator.hasNext()) {
ColumnMeta columnMeta = iterator.next();
if (!this.columns.contains(columnMeta.field)) {
iterator.remove();
}
}
}
List<Record> recordBatch = new ArrayList<>();
Record record;
@ -141,26 +123,13 @@ public class DefaultDataHandler implements DataHandler {
}
if (affectedRows != count) {
LOG.error("write record missing or incorrect happened, affectedRows: " + affectedRows + ", total: " + count);
LOG.error(
"write record missing or incorrect happened, affectedRows: " + affectedRows + ", total: " + count);
}
return affectedRows;
}
/**
* jdbcUrl 中解析出数据库名称
* @param jdbcUrl 格式是 jdbc:<protocol>://<host>:<port>/<dbname>[?可选参数]
* @return 数据库名称
*/
private static String getDbnameFromJdbcUrl(String jdbcUrl) {
int questionMarkIndex = -1;
if (jdbcUrl.contains("?")) {
questionMarkIndex = jdbcUrl.indexOf("?");
}
return questionMarkIndex == -1 ? jdbcUrl.substring(jdbcUrl.lastIndexOf("/") + 1) :
jdbcUrl.substring(jdbcUrl.lastIndexOf("/") + 1, questionMarkIndex);
}
private int writeEachRow(Connection conn, List<Record> recordBatch) {
int affectedRows = 0;
for (Record record : recordBatch) {
@ -190,7 +159,7 @@ public class DefaultDataHandler implements DataHandler {
public int writeBatch(Connection conn, List<Record> recordBatch) throws SQLException {
int affectedRows = 0;
for (String table : tables) {
TableMeta tableMeta = tableMetas.get(table);
TableMeta tableMeta = this.schemaCache.getTableMeta(table);
switch (tableMeta.tableType) {
case SUP_TABLE: {
if (columns.contains("tbname")) {
@ -212,17 +181,15 @@ public class DefaultDataHandler implements DataHandler {
return affectedRows;
}
private int writeBatchToSupTableWithoutTbname(Connection conn, String table, List<Record> recordBatch, Map<String, String> tag2Tbname) throws SQLException {
List<ColumnMeta> columnMetas = tbnameColumnMetasMap.get(table);
private int writeBatchToSupTableWithoutTbname(Connection conn, String table, List<Record> recordBatch,
Map<String, String> tag2Tbname) throws SQLException {
List<ColumnMeta> columnMetas = schemaCache.getColumnMetaList(table);
List<Record> subTableExist = filterSubTableExistRecords(recordBatch, columnMetas, tag2Tbname);
List<Record> subTableNotExist = filterSubTableNotExistRecords(recordBatch, columnMetas, tag2Tbname);
int affectedRows = 0;
Map<String, List<Record>> subTableRecordsMap = splitRecords(subTableExist, columnMetas, tag2Tbname);
List<String> subTables = new ArrayList<>(subTableRecordsMap.keySet());
this.tbnameColumnMetasMap.putAll(schemaManager.loadColumnMetas(subTables));
for (String subTable : subTableRecordsMap.keySet()) {
List<Record> subTableRecords = subTableRecordsMap.get(subTable);
affectedRows += writeBatchToNormalTable(conn, subTable, subTableRecords);
@ -232,21 +199,24 @@ public class DefaultDataHandler implements DataHandler {
return affectedRows;
}
private List<Record> filterSubTableExistRecords(List<Record> recordBatch, List<ColumnMeta> columnMetas, Map<String, String> tag2Tbname) {
private List<Record> filterSubTableExistRecords(List<Record> recordBatch, List<ColumnMeta> columnMetas,
Map<String, String> tag2Tbname) {
return recordBatch.stream().filter(record -> {
String tagStr = getTagString(columnMetas, record);
return tag2Tbname.containsKey(tagStr);
}).collect(Collectors.toList());
}
private List<Record> filterSubTableNotExistRecords(List<Record> recordBatch, List<ColumnMeta> columnMetas, Map<String, String> tag2Tbname) {
private List<Record> filterSubTableNotExistRecords(List<Record> recordBatch, List<ColumnMeta> columnMetas,
Map<String, String> tag2Tbname) {
return recordBatch.stream().filter(record -> {
String tagStr = getTagString(columnMetas, record);
return !tag2Tbname.containsKey(tagStr);
}).collect(Collectors.toList());
}
private Map<String, List<Record>> splitRecords(List<Record> subTableExist, List<ColumnMeta> columnMetas, Map<String, String> tag2Tbname) {
private Map<String, List<Record>> splitRecords(List<Record> subTableExist, List<ColumnMeta> columnMetas,
Map<String, String> tag2Tbname) {
Map<String, List<Record>> ret = new HashMap<>();
for (Record record : subTableExist) {
String tagstr = getTagString(columnMetas, record);
@ -278,7 +248,9 @@ public class DefaultDataHandler implements DataHandler {
return column.asString();
}
} catch (Exception e) {
LOG.error("failed to get Tag, colIndex: " + colIndex + ", ColumnMeta: " + columnMeta + ", record: " + record, e);
LOG.error(
"failed to get Tag, colIndex: " + colIndex + ", ColumnMeta: " + columnMeta + ", record: " +
record, e);
}
}
return "";
@ -291,12 +263,14 @@ public class DefaultDataHandler implements DataHandler {
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
*/
private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
List<ColumnMeta> columnMetas = this.schemaCache.getColumnMetaList(table);
StringBuilder sb = new StringBuilder("insert into");
for (Record record : recordBatch) {
sb.append(" `").append(record.getColumn(indexOf("tbname")).asString())
.append("` using ").append(table)
sb.append(" `")
.append(record.getColumn(indexOf("tbname")).asString())
.append("` using ")
.append(table)
.append(" tags")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return colMeta.isTag;
@ -371,17 +345,19 @@ public class DefaultDataHandler implements DataHandler {
* table: ["stb1"], column: ["ts", "f1", "f2", "t1"]
* data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts
*/
private int writeBatchToSupTableBySchemaless(Connection conn, String table, List<Record> recordBatch) throws SQLException {
private int writeBatchToSupTableBySchemaless(Connection conn, String table,
List<Record> recordBatch) throws SQLException {
int count = 0;
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
List<ColumnMeta> columnMetaList = this.tbnameColumnMetasMap.get(table);
List<ColumnMeta> columnMetaList = this.schemaCache.getColumnMetaList(table);
ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get();
List<String> lines = new ArrayList<>();
for (Record record : recordBatch) {
StringBuilder sb = new StringBuilder();
sb.append(table).append(",")
sb.append(table)
.append(",")
.append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return colMeta.isTag;
}).map(colMeta -> {
@ -395,7 +371,7 @@ public class DefaultDataHandler implements DataHandler {
return !colMeta.isTag && !colMeta.isPrimaryKey;
}).map(colMeta -> {
return colMeta.field + "=" + buildSchemalessColumnValue(colMeta, record);
// return colMeta.field + "=" + record.getColumn(indexOf(colMeta.field)).asString();
// return colMeta.field + "=" + record.getColumn(indexOf(colMeta.field)).asString();
}).collect(Collectors.joining(",")))
.append(" ");
// timestamp
@ -512,10 +488,12 @@ public class DefaultDataHandler implements DataHandler {
* insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
*/
private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
List<ColumnMeta> columnMetas = this.schemaCache.getColumnMetaList(table);
StringBuilder sb = new StringBuilder();
sb.append("insert into `").append(table).append("` ")
sb.append("insert into `")
.append(table)
.append("` ")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
@ -527,9 +505,12 @@ public class DefaultDataHandler implements DataHandler {
if (columns.contains("tbname") && !table.equals(record.getColumn(indexOf("tbname")).asString()))
continue;
boolean tagsAllMatch = columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
boolean tagsAllMatch = columnMetas.stream()
.filter(colMeta -> columns.contains(colMeta.field))
.filter(colMeta -> {
return colMeta.isTag;
}).allMatch(colMeta -> {
})
.allMatch(colMeta -> {
Column column = record.getColumn(indexOf(colMeta.field));
boolean equals = equals(column, colMeta);
return equals;
@ -582,20 +563,29 @@ public class DefaultDataHandler implements DataHandler {
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
*/
private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.tbnameColumnMetasMap.get(table);
List<ColumnMeta> columnMetas = this.schemaCache.getColumnMetaList(table);
StringBuilder sb = new StringBuilder();
sb.append("insert into `").append(table)
sb.append("insert into `")
.append(table)
.append("` ")
.append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
.append(columnMetas.stream()
.filter(colMeta -> !colMeta.isTag)
.filter(colMeta -> columns.contains(colMeta.field))
.map(colMeta -> {
return colMeta.field;
}).collect(Collectors.joining(",", "(", ")")))
})
.collect(Collectors.joining(",", "(", ")")))
.append(" values ");
for (Record record : recordBatch) {
sb.append(columnMetas.stream().filter(colMeta -> !colMeta.isTag).filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
sb.append(columnMetas.stream()
.filter(colMeta -> !colMeta.isTag)
.filter(colMeta -> columns.contains(colMeta.field))
.map(colMeta -> {
return buildColumnValue(colMeta, record);
}).collect(Collectors.joining(",", "(", ")")));
})
.collect(Collectors.joining(",", "(", ")")));
}
String sql = sb.toString();

View File

@ -0,0 +1,138 @@
package com.alibaba.datax.plugin.writer.tdengine30writer;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.function.Predicate;
/**
* Schema cache for TDengine 3.X
*/
public final class SchemaCache {
private static final Logger log = LoggerFactory.getLogger(TDengineWriter.Job.class);
private static volatile SchemaCache instance;
private static Configuration config;
private static Connection conn;
// table name -> TableMeta
private static final Map<String, TableMeta> tableMetas = new LinkedHashMap<>();
// table name ->List<ColumnMeta>
private static final Map<String, List<ColumnMeta>> columnMetas = new LinkedHashMap<>();
private SchemaCache(Configuration config) {
SchemaCache.config = config;
// connect
final String user = config.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
final String pass = config.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD);
Configuration conn = Configuration.from(config.getList(Key.CONNECTION).get(0).toString());
final String url = conn.getString(Key.JDBC_URL);
try {
SchemaCache.conn = DriverManager.getConnection(url, user, pass);
} catch (SQLException e) {
throw DataXException.asDataXException(
"failed to connect to url: " + url + ", cause: {" + e.getMessage() + "}");
}
final String dbname = TDengineWriter.parseDatabaseFromJdbcUrl(url);
SchemaManager schemaManager = new Schema3_0Manager(SchemaCache.conn, dbname);
// init table meta cache and load
final List<String> tables = conn.getList(Key.TABLE, String.class);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
// init column meta cache
SchemaCache.tableMetas.putAll(tableMetas);
for (String table : tableMetas.keySet()) {
SchemaCache.columnMetas.put(table, new ArrayList<>());
}
}
public static SchemaCache getInstance(Configuration originConfig) {
if (instance == null) {
synchronized (SchemaCache.class) {
if (instance == null) {
instance = new SchemaCache(originConfig);
}
}
}
return instance;
}
public TableMeta getTableMeta(String table_name) {
return tableMetas.get(table_name);
}
public List<ColumnMeta> getColumnMetaList(String tbname) {
if (columnMetas.get(tbname).isEmpty()) {
synchronized (SchemaCache.class) {
if (columnMetas.get(tbname).isEmpty()) {
List<String> column_name = config.getList(Key.COLUMN, String.class);
List<ColumnMeta> colMetaList = getColumnMetaListFromDb(tbname,
(colMeta) -> column_name.contains(colMeta.field));
columnMetas.get(tbname).addAll(colMetaList);
}
}
}
return columnMetas.get(tbname);
}
private List<ColumnMeta> getColumnMetaListFromDb(String tableName, Predicate<ColumnMeta> filter) {
List<ColumnMeta> columnMetaList = columnMetas.get(tableName);
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("describe " + tableName);
for (int i = 0; rs.next(); i++) {
ColumnMeta columnMeta = buildColumnMeta(rs, i == 0);
if (filter.test(columnMeta))
columnMetaList.add(columnMeta);
}
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
for (ColumnMeta colMeta : columnMetaList) {
if (!colMeta.isTag)
continue;
Object tagValue = getTagValue(tableName, colMeta.field);
colMeta.value = tagValue;
}
return columnMetaList;
}
private Object getTagValue(String tableName, String tagName) {
String sql = "select " + tagName + " from " + tableName;
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery(sql);
rs.next();
return rs.getObject(tagName);
} catch (SQLException e) {
throw DataXException.asDataXException("failed to get tag value, cause: {" + e.getMessage() + "}");
}
}
private ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException {
ColumnMeta columnMeta = new ColumnMeta();
columnMeta.field = rs.getString(Constants.COLUMN_META_FIELD);
columnMeta.type = rs.getString(Constants.COLUMN_META_TYPE);
columnMeta.length = rs.getInt(Constants.COLUMN_META_LENGTH);
columnMeta.note = rs.getString(Constants.COLUMN_META_NOTE);
columnMeta.isTag = Constants.COLUMN_META_NOTE_TAG.equals(columnMeta.note);
// columnMeta.isPrimaryKey = "ts".equals(columnMeta.field);
columnMeta.isPrimaryKey = isPrimaryKey;
return columnMeta;
}
}

View File

@ -13,7 +13,6 @@ import java.util.ArrayList;
import java.util.List;
public class TDengineWriter extends Writer {
private static final String PEER_PLUGIN_NAME = "peerPluginName";
public static class Job extends Writer.Job {
@ -29,29 +28,28 @@ public class TDengineWriter extends Writer {
// check username
String user = this.originalConfig.getString(Key.USERNAME);
if (StringUtils.isBlank(user))
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
+ Key.USERNAME + "] is not set.");
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.USERNAME + "] is not set.");
// check password
String password = this.originalConfig.getString(Key.PASSWORD);
if (StringUtils.isBlank(password))
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
+ Key.PASSWORD + "] is not set.");
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.PASSWORD + "] is not set.");
// check connection
List<Object> connection = this.originalConfig.getList(Key.CONNECTION);
if (connection == null || connection.isEmpty())
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
+ Key.CONNECTION + "] is not set.");
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.CONNECTION + "] is not set.");
if (connection.size() > 1)
LOG.warn("connection.size is " + connection.size() + " and only connection[0] will be used.");
Configuration conn = Configuration.from(connection.get(0).toString());
String jdbcUrl = conn.getString(Key.JDBC_URL);
if (StringUtils.isBlank(jdbcUrl))
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
+ Key.JDBC_URL + "] of connection is not set.");
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.JDBC_URL + "] of connection is not set.");
// check column
}
@Override
@ -63,14 +61,17 @@ public class TDengineWriter extends Writer {
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> writerSplitConfigs = new ArrayList<>();
List<Object> conns = this.originalConfig.getList(Key.CONNECTION);
for (int i = 0; i < mandatoryNumber; i++) {
Configuration clone = this.originalConfig.clone();
Configuration conf = Configuration.from(conns.get(0).toString());
String jdbcUrl = conf.getString(Key.JDBC_URL);
Configuration config = Configuration.from(
this.originalConfig.getList(Key.CONNECTION).get(0).toString());
String jdbcUrl = config.getString(Key.JDBC_URL);
clone.set(Key.JDBC_URL, jdbcUrl);
clone.set(Key.TABLE, conf.getList(Key.TABLE));
clone.remove(Key.CONNECTION);
clone.set(Key.TABLE, config.getList(Key.TABLE));
writerSplitConfigs.add(clone);
}
@ -81,12 +82,12 @@ public class TDengineWriter extends Writer {
public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration writerSliceConfig;
private Configuration writerConfig;
private TaskPluginCollector taskPluginCollector;
@Override
public void init() {
this.writerSliceConfig = getPluginJobConf();
this.writerConfig = getPluginJobConf();
this.taskPluginCollector = super.getTaskPluginCollector();
}
@ -97,18 +98,33 @@ public class TDengineWriter extends Writer {
@Override
public void startWrite(RecordReceiver lineReceiver) {
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
String peerPluginName = this.writerConfig.getString(PEER_PLUGIN_NAME);
LOG.debug("start to handle record from: " + peerPluginName);
DataHandler handler;
if (peerPluginName.equals("opentsdbreader"))
handler = new OpentsdbDataHandler(this.writerSliceConfig);
handler = new OpentsdbDataHandler(this.writerConfig);
else
handler = new DefaultDataHandler(this.writerSliceConfig, this.taskPluginCollector);
handler = new DefaultDataHandler(this.writerConfig, this.taskPluginCollector);
long records = handler.handle(lineReceiver, getTaskPluginCollector());
LOG.debug("handle data finished, records: " + records);
}
}
/**
* jdbcUrl 中解析出数据库名称
*
* @param jdbcUrl 格式是 jdbc:<protocol>://<host>:<port>/<dbname>[?可选参数]
* @return 数据库名称
*/
public static String parseDatabaseFromJdbcUrl(String jdbcUrl) {
int questionMarkIndex = -1;
if (jdbcUrl.contains("?")) {
questionMarkIndex = jdbcUrl.indexOf("?");
}
return questionMarkIndex == -1 ? jdbcUrl.substring(jdbcUrl.lastIndexOf("/") + 1) : jdbcUrl.substring(
jdbcUrl.lastIndexOf("/") + 1, questionMarkIndex);
}
}

View File

@ -33,14 +33,10 @@ public class DefaultDataHandlerTest {
public void writeSupTableBySQL() throws SQLException {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"tbname\", \"ts\", \"f1\", \"f2\", \"t1\"]," +
"\"table\":[\"stb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," +
"\"batchSize\": \"1000\"" +
"}");
Configuration configuration = Configuration.from(
"{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," +
"\"column\": [\"tbname\", \"ts\", \"f1\", \"f2\", \"t1\"]," + "\"table\":[\"stb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + "\"batchSize\": \"1000\"" + "}");
long current = System.currentTimeMillis();
List<Record> recordList = IntStream.range(1, 11).mapToObj(i -> {
Record record = new DefaultRecord();
@ -59,8 +55,8 @@ public class DefaultDataHandlerTest {
SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
handler.setTbnameColumnMetasMap(columnMetas);
//handler.setTableMetas(tableMetas);
//handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);
@ -73,14 +69,10 @@ public class DefaultDataHandlerTest {
public void writeSupTableBySQL_2() throws SQLException {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"tbname\", \"ts\", \"f1\", \"t1\"]," +
"\"table\":[\"stb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," +
"\"batchSize\": \"1000\"" +
"}");
Configuration configuration = Configuration.from(
"{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," +
"\"column\": [\"tbname\", \"ts\", \"f1\", \"t1\"]," + "\"table\":[\"stb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + "\"batchSize\": \"1000\"" + "}");
long current = System.currentTimeMillis();
List<Record> recordList = IntStream.range(1, 11).mapToObj(i -> {
Record record = new DefaultRecord();
@ -97,8 +89,8 @@ public class DefaultDataHandlerTest {
SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
handler.setTbnameColumnMetasMap(columnMetas);
//handler.setTableMetas(tableMetas);
//handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);
@ -111,14 +103,10 @@ public class DefaultDataHandlerTest {
public void writeSupTableBySchemaless() throws SQLException {
// given
createSupTable();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," +
"\"table\":[\"stb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS://" + host + ":6030/scm_test\"," +
"\"batchSize\": \"1000\"" +
"}");
Configuration configuration = Configuration.from(
"{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," +
"\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," + "\"table\":[\"stb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS://" + host + ":6030/scm_test\"," + "\"batchSize\": \"1000\"" + "}");
String jdbcUrl = configuration.getString("jdbcUrl");
Connection connection = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
long current = System.currentTimeMillis();
@ -137,8 +125,8 @@ public class DefaultDataHandlerTest {
SchemaManager schemaManager = new SchemaManager(connection);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
handler.setTbnameColumnMetasMap(columnMetas);
//handler.setTableMetas(tableMetas);
//handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(connection, recordList);
@ -151,14 +139,10 @@ public class DefaultDataHandlerTest {
public void writeSubTableWithTableName() throws SQLException {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"tbname\", \"ts\", \"f1\", \"f2\", \"t1\"]," +
"\"table\":[\"tb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," +
"\"batchSize\": \"1000\"" +
"}");
Configuration configuration = Configuration.from(
"{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," +
"\"column\": [\"tbname\", \"ts\", \"f1\", \"f2\", \"t1\"]," + "\"table\":[\"tb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + "\"batchSize\": \"1000\"" + "}");
long current = System.currentTimeMillis();
List<Record> recordList = IntStream.range(1, 11).mapToObj(i -> {
Record record = new DefaultRecord();
@ -176,8 +160,8 @@ public class DefaultDataHandlerTest {
SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
handler.setTbnameColumnMetasMap(columnMetas);
//handler.setTableMetas(tableMetas);
//handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);
@ -190,15 +174,11 @@ public class DefaultDataHandlerTest {
public void writeSubTableWithoutTableName() throws SQLException {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," +
"\"table\":[\"tb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," +
"\"batchSize\": \"1000\"," +
"\"ignoreTagsUnmatched\": \"true\"" +
"}");
Configuration configuration = Configuration.from(
"{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," +
"\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," + "\"table\":[\"tb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + "\"batchSize\": \"1000\"," +
"\"ignoreTagsUnmatched\": \"true\"" + "}");
long current = System.currentTimeMillis();
List<Record> recordList = IntStream.range(1, 11).mapToObj(i -> {
Record record = new DefaultRecord();
@ -215,8 +195,8 @@ public class DefaultDataHandlerTest {
SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
handler.setTbnameColumnMetasMap(columnMetas);
//handler.setTableMetas(tableMetas);
//handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);
@ -229,15 +209,11 @@ public class DefaultDataHandlerTest {
public void writeNormalTable() throws SQLException {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," +
"\"table\":[\"weather\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," +
"\"batchSize\": \"1000\"," +
"\"ignoreTagsUnmatched\": \"true\"" +
"}");
Configuration configuration = Configuration.from(
"{" + "\"username\": \"root\"," + "\"password\": \"taosdata\"," +
"\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," + "\"table\":[\"weather\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," + "\"batchSize\": \"1000\"," +
"\"ignoreTagsUnmatched\": \"true\"" + "}");
long current = System.currentTimeMillis();
List<Record> recordList = IntStream.range(1, 11).mapToObj(i -> {
Record record = new DefaultRecord();
@ -254,8 +230,8 @@ public class DefaultDataHandlerTest {
SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
handler.setTbnameColumnMetasMap(columnMetas);
//handler.setTableMetas(tableMetas);
//handler.setTbnameColumnMetasMap(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(conn, recordList);