resolve code review suggestions

This commit is contained in:
zyyang 2022-05-05 10:30:08 +08:00
parent 50c40095f3
commit 1a7a00c7d1
7 changed files with 141 additions and 100 deletions

View File

@ -22,7 +22,7 @@ TDengineReader 通过 TDengine 的 JDBC driver 查询获取数据。
"reader": {
"name": "tdenginereader",
"parameter": {
"user": "root",
"username": "root",
"password": "taosdata",
"connection": [
{
@ -165,24 +165,8 @@ TDengineReader 通过 TDengine 的 JDBC driver 查询获取数据。
#### 4.1.1 数据特征
建表语句:
单行记录类似于:
#### 4.1.2 机器参数
* 执行DataX的机器参数为:
1. cpu:
2. mem:
3. net: 千兆双网卡
4. disc: DataX 数据不落磁盘,不统计此项
* TDengine数据库机器参数为:
1. cpu:
2. mem:
3. net: 千兆双网卡
4. disc:
#### 4.1.3 DataX jvm 参数
-Xms1024m -Xmx1024m -XX:+HeapDumpOnOutOfMemoryError
@ -201,9 +185,6 @@ TDengineReader 通过 TDengine 的 JDBC driver 查询获取数据。
说明:
1. 这里的单表,主键类型为 bigint(20),自增。
2. batchSize 和 通道个数,对性能影响较大。
#### 4.2.4 性能测试小结
1.

View File

@ -93,7 +93,7 @@ public class TDengineReader extends Reader {
}
if (start >= end)
throw DataXException.asDataXException(TDengineReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.BEGIN_DATETIME + "] should be less than the parameter [" + Key.END_DATETIME + "].");
"The parameter " + Key.BEGIN_DATETIME + ": " + beginDatetime + " should be less than the parameter " + Key.END_DATETIME + ": " + endDatetime + ".");
}
@ -119,7 +119,6 @@ public class TDengineReader extends Reader {
}
}
LOG.info("Configuration: {}", configurations);
return configurations;
}
}
@ -142,15 +141,14 @@ public class TDengineReader extends Reader {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (ClassNotFoundException ignored) {
LOG.warn(ignored.getMessage(), ignored);
}
}
@Override
public void init() {
this.readerSliceConfig = super.getPluginJobConf();
LOG.info("getPluginJobConf: {}", readerSliceConfig);
String user = readerSliceConfig.getString(Key.USERNAME);
String password = readerSliceConfig.getString(Key.PASSWORD);
@ -174,7 +172,12 @@ public class TDengineReader extends Reader {
@Override
public void destroy() {
try {
if (conn != null)
conn.close();
} catch (SQLException e) {
LOG.error(e.getMessage(), e);
}
}
@Override
@ -199,22 +202,15 @@ public class TDengineReader extends Reader {
sqlList.addAll(querySql);
}
try (Statement stmt = conn.createStatement()) {
for (String sql : sqlList) {
for (String sql : sqlList) {
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery(sql);
while (rs.next()) {
Record record = buildRecord(recordSender, rs, mandatoryEncoding);
recordSender.sendToWriter(record);
}
}
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineReaderErrorCode.RUNTIME_EXCEPTION, e.getMessage(), e);
} finally {
try {
if (conn != null)
conn.close();
} catch (SQLException e) {
e.printStackTrace();
LOG.error(e.getMessage(), e);
}
}
}

View File

@ -1,8 +1,6 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
public class Constants {
public static final String DEFAULT_USERNAME = "root";
public static final String DEFAULT_PASSWORD = "taosdata";
public static final int DEFAULT_BATCH_SIZE = 1;
public static final int DEFAULT_BATCH_SIZE = 1000;
public static final boolean DEFAULT_IGNORE_TAGS_UNMATCHED = false;
}

View File

@ -52,14 +52,13 @@ public class DefaultDataHandler implements DataHandler {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
Class.forName("com.taosdata.jdbc.rs.RestfulDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
} catch (ClassNotFoundException ignored) {
}
}
public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) {
this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD);
this.username = configuration.getString(Key.USERNAME);
this.password = configuration.getString(Key.PASSWORD);
this.jdbcUrl = configuration.getString(Key.JDBC_URL);
this.batchSize = configuration.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE);
this.tables = configuration.getList(Key.TABLE, String.class);
@ -73,14 +72,15 @@ public class DefaultDataHandler implements DataHandler {
int count = 0;
int affectedRows = 0;
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
// prepare table_name -> table_meta
this.schemaManager = new SchemaManager(conn);
this.tableMetas = schemaManager.loadTableMeta(tables);
// prepare table_name -> column_meta
this.columnMetas = schemaManager.loadColumnMetas(tables);
if (schemaManager == null) {
// prepare table_name -> table_meta
this.schemaManager = new SchemaManager(conn);
this.tableMetas = schemaManager.loadTableMeta(tables);
// prepare table_name -> column_meta
this.columnMetas = schemaManager.loadColumnMetas(tables);
}
List<Record> recordBatch = new ArrayList<>();
Record record;
@ -89,10 +89,11 @@ public class DefaultDataHandler implements DataHandler {
recordBatch.add(record);
} else {
try {
affectedRows = writeBatch(conn, recordBatch);
} catch (SQLException e) {
recordBatch.add(record);
affectedRows += writeBatch(conn, recordBatch);
} catch (Exception e) {
LOG.warn("use one row insert. because:" + e.getMessage());
affectedRows = writeEachRow(conn, recordBatch);
affectedRows += writeEachRow(conn, recordBatch);
}
recordBatch.clear();
}
@ -101,10 +102,10 @@ public class DefaultDataHandler implements DataHandler {
if (!recordBatch.isEmpty()) {
try {
affectedRows = writeBatch(conn, recordBatch);
} catch (SQLException e) {
affectedRows += writeBatch(conn, recordBatch);
} catch (Exception e) {
LOG.warn("use one row insert. because:" + e.getMessage());
affectedRows = writeEachRow(conn, recordBatch);
affectedRows += writeEachRow(conn, recordBatch);
}
recordBatch.clear();
}
@ -126,8 +127,8 @@ public class DefaultDataHandler implements DataHandler {
recordList.add(record);
try {
affectedRows += writeBatch(conn, recordList);
} catch (SQLException e) {
LOG.error(e.getMessage());
} catch (Exception e) {
LOG.error(e.getMessage(), e);
this.taskPluginCollector.collectDirtyRecord(record, e);
}
}
@ -145,7 +146,7 @@ public class DefaultDataHandler implements DataHandler {
* 3. 对于tb拼sql例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2)
* 4. 对于t拼sql例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2)
*/
public int writeBatch(Connection conn, List<Record> recordBatch) throws SQLException {
public int writeBatch(Connection conn, List<Record> recordBatch) throws Exception {
int affectedRows = 0;
for (String table : tables) {
TableMeta tableMeta = tableMetas.get(table);
@ -173,31 +174,62 @@ public class DefaultDataHandler implements DataHandler {
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
*/
private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) throws SQLException {
private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) throws Exception {
List<ColumnMeta> columnMetas = this.columnMetas.get(table);
StringBuilder sb = new StringBuilder("insert into");
for (Record record : recordBatch) {
sb.append(" ").append(record.getColumn(indexOf("tbname")).asString())
.append(" using ").append(table)
.append(" tags")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return colMeta.isTag;
}).map(colMeta -> {
return buildColumnValue(colMeta, record);
}).collect(Collectors.joining(",", "(", ")")))
.append(" ")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
.append(" tags");
// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
// return colMeta.isTag;
// }).map(colMeta -> {
// return buildColumnValue(colMeta, record);
// }).collect(Collectors.joining(",", "(", ")")));
sb.append("(");
for (int i = 0; i < columns.size(); i++) {
ColumnMeta colMeta = columnMetas.get(i);
if (!columns.contains(colMeta.field))
continue;
if (!colMeta.isTag)
continue;
String tagValue = buildColumnValue(colMeta, record);
if (i == 0) {
sb.append(tagValue);
} else {
sb.append(",").append(tagValue);
}
}
sb.append(")");
sb.append(" ").append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
return colMeta.field;
}).collect(Collectors.joining(",", "(", ")")))
.append(" values")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
return buildColumnValue(colMeta, record);
}).collect(Collectors.joining(",", "(", ")")));
.append(" values");
// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
// return !colMeta.isTag;
// }).map(colMeta -> {
// return buildColumnValue(colMeta, record);
// }).collect(Collectors.joining(",", "(", ")")));
sb.append("(");
for (int i = 0; i < columnMetas.size(); i++) {
ColumnMeta colMeta = columnMetas.get(i);
if (!columns.contains(colMeta.field))
continue;
if (colMeta.isTag)
continue;
String colValue = buildColumnValue(colMeta, record);
if (i == 0) {
sb.append(colValue);
} else {
sb.append(",").append(colValue);
}
}
sb.append(")");
}
String sql = sb.toString();
@ -213,10 +245,11 @@ public class DefaultDataHandler implements DataHandler {
return count;
}
private String buildColumnValue(ColumnMeta colMeta, Record record) {
private String buildColumnValue(ColumnMeta colMeta, Record record) throws Exception {
Column column = record.getColumn(indexOf(colMeta.field));
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
switch (column.getType()) {
Column.Type type = column.getType();
switch (type) {
case DATE: {
Date value = column.asDate();
switch (timestampPrecision) {
@ -243,8 +276,9 @@ public class DefaultDataHandler implements DataHandler {
case DOUBLE:
case INT:
case LONG:
column.asString();
default:
return column.asString();
throw new Exception("invalid column type: " + type);
}
}
@ -392,7 +426,7 @@ public class DefaultDataHandler implements DataHandler {
* else
* insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
*/
private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) throws Exception {
List<ColumnMeta> columnMetas = this.columnMetas.get(table);
StringBuilder sb = new StringBuilder();
@ -419,11 +453,25 @@ public class DefaultDataHandler implements DataHandler {
if (ignoreTagsUnmatched && !tagsAllMatch)
continue;
sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
return buildColumnValue(colMeta, record);
}).collect(Collectors.joining(", ", "(", ") ")));
// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
// return !colMeta.isTag;
// }).map(colMeta -> {
// return buildColumnValue(colMeta, record);
// }).collect(Collectors.joining(", ", "(", ") ")));
sb.append("(");
for (int i = 0; i < columnMetas.size(); i++) {
ColumnMeta colMeta = columnMetas.get(i);
if (colMeta.isTag)
continue;
String colValue = buildColumnValue(colMeta, record);
if (i == 0) {
sb.append(colValue);
} else {
sb.append(",").append(colValue);
}
}
sb.append(")");
validRecords++;
}
@ -462,7 +510,7 @@ public class DefaultDataHandler implements DataHandler {
* table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"]
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
*/
private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) throws Exception {
List<ColumnMeta> columnMetas = this.columnMetas.get(table);
StringBuilder sb = new StringBuilder();
@ -474,9 +522,22 @@ public class DefaultDataHandler implements DataHandler {
.append(" values ");
for (Record record : recordBatch) {
sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
return buildColumnValue(colMeta, record);
}).collect(Collectors.joining(",", "(", ")")));
// sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
// return buildColumnValue(colMeta, record);
// }).collect(Collectors.joining(",", "(", ")")));
sb.append("(");
for (int i = 0; i < columnMetas.size(); i++) {
ColumnMeta colMeta = columnMetas.get(i);
if (!columns.contains(colMeta.field))
continue;
String colValue = buildColumnValue(colMeta, record);
if (i == 0) {
sb.append(colValue);
} else {
sb.append(",").append(colValue);
}
}
sb.append(")");
}
String sql = sb.toString();

View File

@ -10,6 +10,7 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.stream.Collectors;
public class SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
@ -123,12 +124,14 @@ public class SchemaManager {
}
}
} catch (SQLException e) {
e.printStackTrace();
LOG.error(e.getMessage(), e);
}
colMeta.value = value;
});
LOG.debug("load column metadata of " + table + ": " + Arrays.toString(columnMetaList.toArray()));
LOG.debug("load column metadata of " + table + ": " +
columnMetaList.stream().map(ColumnMeta::toString).collect(Collectors.joining(",", "[", "]"))
);
ret.put(table, columnMetaList);
}
return ret;
@ -142,7 +145,9 @@ public class SchemaManager {
tableMeta.tags = rs.getInt("tags");
tableMeta.tables = rs.getInt("tables");
LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
if (LOG.isDebugEnabled()){
LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
}
return tableMeta;
}

View File

@ -4,10 +4,10 @@ import com.alibaba.datax.common.spi.ErrorCode;
public enum TDengineWriterErrorCode implements ErrorCode {
REQUIRED_VALUE("TDengineWriter-00", "缺失必要的值"),
ILLEGAL_VALUE("TDengineWriter-01", "值非法"),
RUNTIME_EXCEPTION("TDengineWriter-02", "运行时异常"),
TYPE_ERROR("TDengineWriter-03", "Datax类型无法正确映射到TDengine类型");
REQUIRED_VALUE("TDengineWriter-00", "parameter value is missing"),
ILLEGAL_VALUE("TDengineWriter-01", "invalid parameter value"),
RUNTIME_EXCEPTION("TDengineWriter-02", "runtime exception"),
TYPE_ERROR("TDengineWriter-03", "data type mapping error");
private final String code;
private final String description;

View File

@ -28,7 +28,7 @@ public class DefaultDataHandlerTest {
private final TaskPluginCollector taskPluginCollector = new TDengineWriter.Task().getTaskPluginCollector();
@Test
public void writeSupTableBySQL() throws SQLException {
public void writeSupTableBySQL() throws Exception {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +
@ -68,7 +68,7 @@ public class DefaultDataHandlerTest {
}
@Test
public void writeSupTableBySQL_2() throws SQLException {
public void writeSupTableBySQL_2() throws Exception {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +
@ -106,7 +106,7 @@ public class DefaultDataHandlerTest {
}
@Test
public void writeSupTableBySchemaless() throws SQLException {
public void writeSupTableBySchemaless() throws Exception {
// given
createSupTable();
Configuration configuration = Configuration.from("{" +
@ -146,7 +146,7 @@ public class DefaultDataHandlerTest {
}
@Test
public void writeSubTableWithTableName() throws SQLException {
public void writeSubTableWithTableName() throws Exception {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +
@ -185,7 +185,7 @@ public class DefaultDataHandlerTest {
}
@Test
public void writeSubTableWithoutTableName() throws SQLException {
public void writeSubTableWithoutTableName() throws Exception {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +
@ -224,7 +224,7 @@ public class DefaultDataHandlerTest {
}
@Test
public void writeNormalTable() throws SQLException {
public void writeNormalTable() throws Exception {
// given
createSupAndSubTable();
Configuration configuration = Configuration.from("{" +