tdenginewriter rebuiled

This commit is contained in:
zyyang 2022-02-14 18:58:17 +08:00
parent b28ca9c0a8
commit df3ea169b0
29 changed files with 1375 additions and 884 deletions

View File

@ -32,7 +32,7 @@
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.34</version>
<version>2.0.37</version>
</dependency>
<dependency>

View File

@ -7,7 +7,7 @@ public class Key {
// public static final String PORT = "port";
// public static final String DB = "db";
public static final String TABLE = "table";
public static final String USER = "user";
public static final String USER = "username";
public static final String PASSWORD = "password";
public static final String CONNECTION = "connection";
// public static final String SQL = "sql";

View File

@ -190,8 +190,9 @@ public class TDengineReader extends Reader {
@Override
public void startRead(RecordSender recordSender) {
try (Statement stmt = conn.createStatement()) {
for (int i = 0; i < tables.size(); i++) {
String sql = "select " + StringUtils.join(columns, ",") + " from " + tables.get(i) + " where _c0 >= " + startTime + " and _c0 < " + endTime;
for (String table : tables) {
String sql = "select " + StringUtils.join(columns, ",") + " from " + table
+ " where _c0 >= " + startTime + " and _c0 < " + endTime;
ResultSet rs = stmt.executeQuery(sql);
ResultSetMetaData metaData = rs.getMetaData();
while (rs.next()) {

View File

@ -19,11 +19,25 @@
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.58</version>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.34</version>
<version>2.0.37</version>
<exclusions>
<exclusion>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
@ -36,18 +50,18 @@
</exclusions>
</dependency>
<dependency>
<groupId>com.taosdata.jdbc</groupId>
<artifactId>taos-jdbcdriver</artifactId>
<version>2.0.34</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>${junit-version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>

View File

@ -0,0 +1,24 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
public class ColumnMeta {
String field;
String type;
int length;
String note;
boolean isTag;
boolean isPrimaryKey;
Object value;
@Override
public String toString() {
return "ColumnMeta{" +
"field='" + field + '\'' +
", type='" + type + '\'' +
", length=" + length +
", note='" + note + '\'' +
", isTag=" + isTag +
", isPrimaryKey=" + isPrimaryKey +
", value=" + value +
'}';
}
}

View File

@ -0,0 +1,8 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
public class Constants {
public static final String DEFAULT_USERNAME = "root";
public static final String DEFAULT_PASSWORD = "taosdata";
public static final int DEFAULT_BATCH_SIZE = 1;
public static final boolean DEFAULT_IGNORE_TAGS_UNMATCHED = false;
}

View File

@ -1,12 +1,9 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import java.util.Properties;
public interface DataHandler {
long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector);
int handle(RecordReceiver lineReceiver, TaskPluginCollector collector);
}

View File

@ -1,10 +0,0 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
public class DataHandlerFactory {
public static DataHandler build(String peerPluginName) {
if (peerPluginName.equals("opentsdbreader"))
return new OpentsdbDataHandler();
return new DefaultDataHandler();
}
}

View File

@ -1,108 +1,423 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.taosdata.jdbc.TSDBDriver;
import com.taosdata.jdbc.TSDBPreparedStatement;
import com.alibaba.datax.common.util.Configuration;
import com.taosdata.jdbc.SchemalessWriter;
import com.taosdata.jdbc.enums.SchemalessProtocolType;
import com.taosdata.jdbc.enums.SchemalessTimestampType;
import com.taosdata.jdbc.utils.Utils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Properties;
import java.sql.*;
import java.util.ArrayList;
import java.util.Date;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 默认DataHandler
*/
public class DefaultDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
static {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
private String username;
private String password;
private String jdbcUrl;
private int batchSize;
private boolean ignoreTagsUnmatched;
private List<String> tables;
private List<String> columns;
private Map<String, TableMeta> tableMetas;
private SchemaManager schemaManager;
public void setTableMetas(Map<String, TableMeta> tableMetas) {
this.tableMetas = tableMetas;
}
public void setColumnMetas(Map<String, List<ColumnMeta>> columnMetas) {
this.columnMetas = columnMetas;
}
public void setSchemaManager(SchemaManager schemaManager) {
this.schemaManager = schemaManager;
}
private Map<String, List<ColumnMeta>> columnMetas;
public DefaultDataHandler(Configuration configuration) {
this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD);
this.jdbcUrl = configuration.getString(Key.JDBC_URL);
this.batchSize = configuration.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE);
this.tables = configuration.getList(Key.TABLE, String.class);
this.columns = configuration.getList(Key.COLUMN, String.class);
this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED, Constants.DEFAULT_IGNORE_TAGS_UNMATCHED);
}
@Override
public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) {
SchemaManager schemaManager = new SchemaManager(properties);
if (!schemaManager.configValid()) {
return 0;
}
public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) {
int count = 0;
int affectedRows = 0;
try {
Connection conn = getTaosConnection(properties);
if (conn == null) {
return 0;
}
if (schemaManager.shouldGuessSchema()) {
// 无法从配置文件获取表结构信息尝试从数据库获取
LOG.info(Msg.get("try_get_schema_from_db"));
boolean success = schemaManager.getFromDB(conn);
if (!success) {
return 0;
try (Connection conn = DriverManager.getConnection(jdbcUrl, username, password)) {
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + username + "] established.");
// prepare table_name -> table_meta
this.schemaManager = new SchemaManager(conn);
this.tableMetas = schemaManager.loadTableMeta(tables);
// prepare table_name -> column_meta
this.columnMetas = schemaManager.loadColumnMetas(tables);
List<Record> recordBatch = new ArrayList<>();
Record record;
for (int i = 1; (record = lineReceiver.getFromReader()) != null; i++) {
if (i % batchSize != 0) {
recordBatch.add(record);
} else {
affectedRows = writeBatch(conn, recordBatch);
recordBatch.clear();
}
} else {
count++;
}
int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, "1000"));
if (batchSize < 5) {
// batchSize太小会增加自动类型推断错误的概率建议改大后重试
LOG.error(Msg.get("batch_size_too_small"));
return 0;
}
return write(lineReceiver, conn, batchSize, schemaManager, collector);
} catch (Exception e) {
LOG.error("write failed " + e.getMessage());
e.printStackTrace();
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
return 0;
}
private Connection getTaosConnection(Properties properties) throws SQLException {
// 检查必要参数
String host = properties.getProperty(Key.HOST);
String port = properties.getProperty(Key.PORT);
String dbname = properties.getProperty(Key.DBNAME);
String user = properties.getProperty(Key.USER);
String password = properties.getProperty(Key.PASSWORD);
if (host == null || port == null || dbname == null || user == null || password == null) {
String keys = String.join(" ", Key.HOST, Key.PORT, Key.DBNAME, Key.USER, Key.PASSWORD);
LOG.error("Required options missing, please check: " + keys);
return null;
if (affectedRows != count) {
LOG.error("write record missing or incorrect happened, affectedRows: " + affectedRows + ", total: " + count);
}
String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", host, port, dbname, user, password);
Properties connProps = new Properties();
connProps.setProperty(TSDBDriver.PROPERTY_KEY_CHARSET, "UTF-8");
LOG.info("TDengine connection established, host:{} port:{} dbname:{} user:{}", host, port, dbname, user);
return DriverManager.getConnection(jdbcUrl, connProps);
return affectedRows;
}
/**
* 使用SQL批量写入<br/>
*
* @return 成功写入记录数
* @throws SQLException
* table: [ "stb1", "stb2", "tb1", "tb2", "t1" ]
* stb1[ts,f1,f2] tags:[t1]
* stb2[ts,f1,f2,f3] tags:[t1,t2]
* 1. tables 表的的类型分成stb(super table)/tb(sub table)/t(original table)
* 2. 对于stb自动建表/schemaless
* 2.1: data中有tbname字段, 例如data: [ts, f1, f2, f3, t1, t2, tbname] tbColumn: [ts, f1, f2, t1] => insert into tbname using stb1 tags(t1) values(ts, f1, f2)
* 2.2: data中没有tbname字段例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts, 没有批量写
* 3. 对于tb拼sql例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2)
* 4. 对于t拼sql例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2)
*/
private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm, TaskPluginCollector collector) throws SQLException {
Record record = lineReceiver.getFromReader();
if (record == null) {
return 0;
public int writeBatch(Connection conn, List<Record> recordBatch) {
int affectedRows = 0;
for (String table : tables) {
TableMeta tableMeta = tableMetas.get(table);
switch (tableMeta.tableType) {
case SUP_TABLE: {
if (columns.contains("tbname"))
affectedRows += writeBatchToSupTableBySQL(conn, table, recordBatch);
else
affectedRows += writeBatchToSupTableBySchemaless(conn, table, recordBatch);
}
break;
case SUB_TABLE:
affectedRows += writeBatchToSubTable(conn, table, recordBatch);
break;
case NML_TABLE:
default:
affectedRows += writeBatchToNormalTable(conn, table, recordBatch);
}
}
String pq = String.format("INSERT INTO ? USING %s TAGS(%s) (%s) values (%s)", scm.getStable(), scm.getTagValuesPlaceHolder(), scm.getJoinedFieldNames(), scm.getFieldValuesPlaceHolder());
LOG.info("Prepared SQL: {}", pq);
try (TSDBPreparedStatement stmt = (TSDBPreparedStatement) conn.prepareStatement(pq)) {
JDBCBatchWriter batchWriter = new JDBCBatchWriter(conn, stmt, scm, batchSize, collector);
do {
batchWriter.append(record);
} while ((record = lineReceiver.getFromReader()) != null);
batchWriter.flush();
return batchWriter.getCount();
return affectedRows;
}
/**
* insert into record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
*/
private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) {
List<ColumnMeta> columnMetas = this.columnMetas.get(table);
StringBuilder sb = new StringBuilder("insert into");
for (Record record : recordBatch) {
sb.append(" ").append(record.getColumn(indexOf("tbname")).asString())
.append(" using ").append(table)
.append(" tags")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return colMeta.isTag;
}).map(colMeta -> {
return buildColumnValue(colMeta, record);
}).collect(Collectors.joining(",", "(", ")")))
.append(" ")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
return colMeta.field;
}).collect(Collectors.joining(",", "(", ")")))
.append(" values")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
return buildColumnValue(colMeta, record);
}).collect(Collectors.joining(",", "(", ")")));
}
String sql = sb.toString();
return executeUpdate(conn, sql);
}
private int executeUpdate(Connection conn, String sql) throws DataXException {
int count;
try (Statement stmt = conn.createStatement()) {
LOG.debug(">>> " + sql);
count = stmt.executeUpdate(sql);
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
return count;
}
private String buildColumnValue(ColumnMeta colMeta, Record record) {
Column column = record.getColumn(indexOf(colMeta.field));
switch (column.getType()) {
case DATE:
return "'" + column.asString() + "'";
case BYTES:
case STRING:
if (colMeta.type.equals("TIMESTAMP"))
return "\"" + column.asString() + "\"";
String value = column.asString();
return "\'" + Utils.escapeSingleQuota(value) + "\'";
case NULL:
case BAD:
return "NULL";
case BOOL:
case DOUBLE:
case INT:
case LONG:
default:
return column.asString();
}
}
/**
* table: ["stb1"], column: ["ts", "f1", "f2", "t1"]
* data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts
*/
private int writeBatchToSupTableBySchemaless(Connection conn, String table, List<Record> recordBatch) {
int count = 0;
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
List<ColumnMeta> columnMetaList = this.columnMetas.get(table);
ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get();
List<String> lines = new ArrayList<>();
for (Record record : recordBatch) {
StringBuilder sb = new StringBuilder();
sb.append(table).append(",")
.append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return colMeta.isTag;
}).map(colMeta -> {
String value = record.getColumn(indexOf(colMeta.field)).asString();
if (value.contains(" "))
value = value.replace(" ", "\\ ");
return colMeta.field + "=" + value;
}).collect(Collectors.joining(",")))
.append(" ")
.append(columnMetaList.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag && !colMeta.isPrimaryKey;
}).map(colMeta -> {
return colMeta.field + "=" + buildSchemalessColumnValue(colMeta, record);
// return colMeta.field + "=" + record.getColumn(indexOf(colMeta.field)).asString();
}).collect(Collectors.joining(",")))
.append(" ");
// timestamp
Column column = record.getColumn(indexOf(ts.field));
Object tsValue = column.getRawData();
if (column.getType() == Column.Type.DATE && tsValue instanceof Date) {
long time = column.asDate().getTime();
switch (timestampPrecision) {
case NANOSEC:
sb.append(time * 1000000);
break;
case MICROSEC:
sb.append(time * 1000);
break;
case MILLISEC:
default:
sb.append(time);
}
} else if (column.getType() == Column.Type.STRING) {
sb.append(Utils.parseTimestamp(column.asString()));
} else {
sb.append(column.asLong());
}
String line = sb.toString();
LOG.debug(">>> " + line);
lines.add(line);
count++;
}
SchemalessWriter writer = new SchemalessWriter(conn);
SchemalessTimestampType timestampType;
switch (timestampPrecision) {
case NANOSEC:
timestampType = SchemalessTimestampType.NANO_SECONDS;
break;
case MICROSEC:
timestampType = SchemalessTimestampType.MICRO_SECONDS;
break;
case MILLISEC:
timestampType = SchemalessTimestampType.MILLI_SECONDS;
break;
default:
timestampType = SchemalessTimestampType.NOT_CONFIGURED;
}
try {
writer.write(lines, SchemalessProtocolType.LINE, timestampType);
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
LOG.warn("schemalessWriter does not return affected rows!");
return count;
}
private long dateAsLong(Column column) {
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
long time = column.asDate().getTime();
switch (timestampPrecision) {
case NANOSEC:
return time * 1000000;
case MICROSEC:
return time * 1000;
case MILLISEC:
default:
return time;
}
}
private String buildSchemalessColumnValue(ColumnMeta colMeta, Record record) {
Column column = record.getColumn(indexOf(colMeta.field));
switch (column.getType()) {
case DATE:
if (colMeta.type.equals("TIMESTAMP"))
return dateAsLong(column) + "i64";
return "L'" + column.asString() + "'";
case BYTES:
case STRING:
if (colMeta.type.equals("TIMESTAMP"))
return column.asString() + "i64";
String value = column.asString();
if (colMeta.type.startsWith("BINARY"))
return "'" + Utils.escapeSingleQuota(value) + "'";
if (colMeta.type.startsWith("NCHAR"))
return "L'" + Utils.escapeSingleQuota(value) + "'";
case NULL:
case BAD:
return "NULL";
case DOUBLE: {
if (colMeta.type.equals("FLOAT"))
return column.asString() + "f32";
if (colMeta.type.equals("DOUBLE"))
return column.asString() + "f64";
}
case INT:
case LONG: {
if (colMeta.type.equals("TINYINT"))
return column.asString() + "i8";
if (colMeta.type.equals("SMALLINT"))
return column.asString() + "i16";
if (colMeta.type.equals("INT"))
return column.asString() + "i32";
if (colMeta.type.equals("BIGINT"))
return column.asString() + "i64";
}
case BOOL:
default:
return column.asString();
}
}
/**
* table: ["tb1"], column: [tbname, ts, f1, f2, t1]
* if contains("tbname") and tbname != tb1 continue;
* else if t1 != record[idx(t1)] or t2 != record[idx(t2)]... continue;
* else
* insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
*/
private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) {
List<ColumnMeta> columnMetas = this.columnMetas.get(table);
StringBuilder sb = new StringBuilder();
sb.append("insert into ").append(table).append(" ")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
return colMeta.field;
}).collect(Collectors.joining(",", "(", ")")))
.append(" values");
int validRecords = 0;
for (Record record : recordBatch) {
if (columns.contains("tbname") && !table.equals(record.getColumn(indexOf("tbname")).asString()))
continue;
boolean tagsAllMatch = columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return colMeta.isTag;
}).allMatch(colMeta -> {
return record.getColumn(indexOf(colMeta.field)).asString().equals(colMeta.value.toString());
});
if (ignoreTagsUnmatched && !tagsAllMatch)
continue;
sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).filter(colMeta -> {
return !colMeta.isTag;
}).map(colMeta -> {
return buildColumnValue(colMeta, record);
}).collect(Collectors.joining(", ", "(", ") ")));
validRecords++;
}
if (validRecords == 0) {
LOG.warn("no valid records in this batch");
return 0;
}
String sql = sb.toString();
return executeUpdate(conn, sql);
}
/**
* table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"]
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
*/
private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) {
List<ColumnMeta> columnMetas = this.columnMetas.get(table);
StringBuilder sb = new StringBuilder();
sb.append("insert into ").append(table)
.append(" ")
.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
return colMeta.field;
}).collect(Collectors.joining(",", "(", ")")))
.append(" values ");
for (Record record : recordBatch) {
sb.append(columnMetas.stream().filter(colMeta -> columns.contains(colMeta.field)).map(colMeta -> {
return buildColumnValue(colMeta, record);
}).collect(Collectors.joining(",", "(", ")")));
}
String sql = sb.toString();
return executeUpdate(conn, sql);
}
private int indexOf(String colName) throws DataXException {
for (int i = 0; i < columns.size(); i++) {
if (columns.get(i).equals(colName))
return i;
}
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
"cannot find col: " + colName + " in columns: " + columns);
}
}

View File

@ -1,244 +0,0 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.taosdata.jdbc.TSDBPreparedStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
/**
* 使用JDBC原生写入接口批量写入<br/>
* 有两个限制条件导致批量写入的代码逻辑过于复杂以至于需要开发新的类来封装<br/>
* 1. 用户必须提前把需要批量写入的数据搜集到ArrayList中
* 2. 每批写入的表名必须相同
* 这个类的实现逻辑是
* 1. 先把属于同一子表的Record缓存起来
* 2. 缓存的数量达到batchSize阈值自动执行一次批量写入
* 3. 最后一批数据需要用户手动flush才能写入
*/
public class JDBCBatchWriter {
public static final Logger LOG = LoggerFactory.getLogger(JDBCBatchWriter.class);
private TSDBPreparedStatement stmt;
private SchemaManager scm;
private Connection conn;
private int batchSize;
private TaskPluginCollector collector;
// 缓存Record key为tableName
Map<String, List<Record>> buf = new HashMap<>();
// 缓存表的标签值 key为tableName
Map<String, String[]> tableTagValues = new HashMap<>();
private long sucCount = 0;
private final int tsColIndex;
private List<String> fieldList;
// 每个record至少应该包含的列数用于校验数据
private int minColNum = 0;
private Map<String, Integer> fieldIndexMap;
private List<Column.Type> fieldTypes = null;
public JDBCBatchWriter(Connection conn, TSDBPreparedStatement stmt, SchemaManager scm, int batchSize, TaskPluginCollector collector) {
this.conn = conn;
this.stmt = stmt;
this.scm = scm;
this.batchSize = batchSize;
this.collector = collector;
this.tsColIndex = scm.getTsColIndex();
this.fieldList = scm.getFieldList();
this.fieldIndexMap = scm.getFieldIndexMap();
this.minColNum = 1 + fieldList.size() + scm.getDynamicTagCount();
}
public void initFiledTypesAndTargetTable(List<Record> records) throws SQLException {
if (fieldTypes != null) {
return;
}
guessFieldTypes(records);
if (scm.shouldCreateTable()) {
scm.createSTable(conn, fieldTypes);
}
}
public void append(Record record) throws SQLException {
int columnNum = record.getColumnNumber();
if (columnNum < minColNum) {
// 实际列数小于期望列数
collector.collectDirtyRecord(record, Msg.get("column_number_error"));
return;
}
String[] tagValues = scm.getTagValuesFromRecord(record);
if (tagValues == null) {
// 标签列包含null
collector.collectDirtyRecord(record, Msg.get("tag_value_error"));
return;
}
if (!scm.hasTimestamp(record)) {
// 时间戳列为null或类型错误
collector.collectDirtyRecord(record, Msg.get("ts_value_error"));
return;
}
String tableName = scm.computeTableName(tagValues);
if (buf.containsKey(tableName)) {
List<Record> lis = buf.get(tableName);
lis.add(record);
if (lis.size() == batchSize) {
if (fieldTypes == null) {
initFiledTypesAndTargetTable(lis);
}
executeBatch(tableName);
lis.clear();
}
} else {
List<Record> lis = new ArrayList<>(batchSize);
lis.add(record);
buf.put(tableName, lis);
tableTagValues.put(tableName, tagValues);
}
}
/**
* 只有String类型比较特别测试发现值为null的列会转成String类型所以Column的类型为String并不代表这一列的类型真的是String
*
* @param records
*/
private void guessFieldTypes(List<Record> records) {
fieldTypes = new ArrayList<>(fieldList.size());
for (int i = 0; i < fieldList.size(); ++i) {
int colIndex = fieldIndexMap.get(fieldList.get(i));
boolean ok = false;
for (int j = 0; j < records.size() && !ok; ++j) {
Column column = records.get(j).getColumn(colIndex);
Column.Type type = column.getType();
switch (type) {
case LONG:
case DOUBLE:
case DATE:
case BOOL:
case BYTES:
if (column.getRawData() != null) {
fieldTypes.add(type);
ok = true;
}
break;
case STRING:
// 只有非null且非空的String列才会被真的当作String类型
String value = column.asString();
if (value != null && !"".equals(value)) {
fieldTypes.add(type);
ok = true;
}
break;
default:
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString());
}
}
if (!ok) {
// 根据采样的%d条数据无法推断第%d列的数据类型
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, String.format(Msg.get("infer_column_type_error"), records.size(), i + 1));
}
}
LOG.info("Field Types: {}", fieldTypes);
}
/**
* 执行单表批量写入
*
* @param tableName
* @throws SQLException
*/
private void executeBatch(String tableName) throws SQLException {
// 表名
stmt.setTableName(tableName);
List<Record> records = buf.get(tableName);
// 标签
String[] tagValues = tableTagValues.get(tableName);
LOG.debug("executeBatch {}", String.join(",", tagValues));
for (int i = 0; i < tagValues.length; ++i) {
stmt.setTagNString(i, tagValues[i]);
}
// 时间戳
ArrayList<Long> tsList = records.stream().map(r -> r.getColumn(tsColIndex).asDate().getTime()).collect(Collectors.toCollection(ArrayList::new));
stmt.setTimestamp(0, tsList);
// 字段
for (int i = 0; i < fieldList.size(); ) {
String fieldName = fieldList.get(i);
int index = fieldIndexMap.get(fieldName);
switch (fieldTypes.get(i)) {
case LONG:
ArrayList<Long> lisLong = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
stmt.setLong(++i, lisLong);
break;
case DOUBLE:
ArrayList<Double> lisDouble = records.stream().map(r -> r.getColumn(index).asDouble()).collect(Collectors.toCollection(ArrayList::new));
stmt.setDouble(++i, lisDouble);
break;
case STRING:
ArrayList<String> lisString = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
stmt.setNString(++i, lisString, 64);
break;
case DATE:
ArrayList<Long> lisTs = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
stmt.setTimestamp(++i, lisTs);
break;
case BOOL:
ArrayList<Boolean> lisBool = records.stream().map(r -> r.getColumn(index).asBoolean()).collect(Collectors.toCollection(ArrayList::new));
stmt.setBoolean(++i, lisBool);
break;
case BYTES:
ArrayList<String> lisBytes = records.stream().map(r -> r.getColumn(index).asString()).collect(Collectors.toCollection(ArrayList::new));
stmt.setString(++i, lisBytes, 64);
break;
default:
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString());
}
}
// 执行
stmt.columnDataAddBatch();
stmt.columnDataExecuteBatch();
// 更新计数器
sucCount += records.size();
}
/**
* 把缓存的Record全部写入
*/
public void flush() throws SQLException {
if (fieldTypes == null) {
List<Record> records = new ArrayList<>();
for (List<Record> lis : buf.values()) {
records.addAll(lis);
if (records.size() > 100) {
break;
}
}
if (records.size() > 0) {
initFiledTypesAndTargetTable(records);
} else {
return;
}
}
for (String tabName : buf.keySet()) {
if (buf.get(tabName).size() > 0) {
executeBatch(tabName);
}
}
stmt.columnDataCloseBatch();
}
/**
* @return 成功写入的数据量
*/
public long getCount() {
return sucCount;
}
}

View File

@ -1,89 +0,0 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import java.util.Properties;
public class JniConnection {
private static final long JNI_NULL_POINTER = 0L;
private static final int JNI_SUCCESSFUL = 0;
public static final String PROPERTY_KEY_CONFIG_DIR = "cfgdir";
public static final String PROPERTY_KEY_LOCALE = "locale";
public static final String PROPERTY_KEY_CHARSET = "charset";
public static final String PROPERTY_KEY_TIME_ZONE = "timezone";
private long conn;
static {
System.loadLibrary("taos");
}
public JniConnection(Properties props) throws Exception {
initImp(props.getProperty(PROPERTY_KEY_CONFIG_DIR, null));
String locale = props.getProperty(PROPERTY_KEY_LOCALE);
if (setOptions(0, locale) < 0) {
throw new Exception("Failed to set locale: " + locale + ". System default will be used.");
}
String charset = props.getProperty(PROPERTY_KEY_CHARSET);
if (setOptions(1, charset) < 0) {
throw new Exception("Failed to set charset: " + charset + ". System default will be used.");
}
String timezone = props.getProperty(PROPERTY_KEY_TIME_ZONE);
if (setOptions(2, timezone) < 0) {
throw new Exception("Failed to set timezone: " + timezone + ". System default will be used.");
}
}
public void open(String host, int port, String dbname, String user, String password) throws Exception {
if (this.conn != JNI_NULL_POINTER) {
close();
this.conn = JNI_NULL_POINTER;
}
this.conn = connectImp(host, port, dbname, user, password);
if (this.conn == JNI_NULL_POINTER) {
String errMsg = getErrMsgImp(0);
throw new Exception(errMsg);
}
}
public void insertOpentsdbJson(String json) throws Exception {
if (this.conn == JNI_NULL_POINTER) {
throw new Exception("JNI connection is NULL");
}
long result = insertOpentsdbJson(json, this.conn);
int errCode = getErrCodeImp(this.conn, result);
if (errCode != JNI_SUCCESSFUL) {
String errMsg = getErrMsgImp(result);
freeResultSetImp(this.conn, result);
throw new Exception(errMsg);
}
freeResultSetImp(this.conn, result);
}
public void close() throws Exception {
int code = this.closeConnectionImp(this.conn);
if (code != 0) {
throw new Exception("JNI closeConnection failed");
}
this.conn = JNI_NULL_POINTER;
}
private static native void initImp(String configDir);
private static native int setOptions(int optionIndex, String optionValue);
private native long connectImp(String host, int port, String dbName, String user, String password);
private native int getErrCodeImp(long connection, long pSql);
private native String getErrMsgImp(long pSql);
private native void freeResultSetImp(long connection, long pSql);
private native int closeConnectionImp(long connection);
private native long insertOpentsdbJson(String json, long pSql);
}

View File

@ -1,14 +1,12 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
public class Key {
public static final String HOST = "host";
public static final String PORT = "port";
public static final String DBNAME = "dbName";
public static final String USER = "username";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
public static final String CONNECTION = "connection";
public static final String BATCH_SIZE = "batchSize";
public static final String STABLE = "stable";
public static final String TAG_COLUMN = "tagColumn";
public static final String FIELD_COLUMN = "fieldColumn";
public static final String TIMESTAMP_COLUMN = "timestampColumn";
}
public static final String TABLE = "table";
public static final String JDBC_URL = "jdbcUrl";
public static final String COLUMN = "column";
public static final String IGNORE_TAGS_UNMATCHED = "ignoreTagsUnmatched";
}

View File

@ -5,50 +5,50 @@ import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.taosdata.jdbc.SchemalessWriter;
import com.taosdata.jdbc.enums.SchemalessProtocolType;
import com.taosdata.jdbc.enums.SchemalessTimestampType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Properties;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
public class OpentsdbDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(OpentsdbDataHandler.class);
private static final String DEFAULT_BATCH_SIZE = "1";
private SchemalessWriter writer;
private String jdbcUrl;
private String user;
private String password;
int batchSize;
public OpentsdbDataHandler(Configuration config) {
// opentsdb json protocol use JNI and schemaless API to write
this.jdbcUrl = config.getString(Key.JDBC_URL);
this.user = config.getString(Key.USERNAME, "root");
this.password = config.getString(Key.PASSWORD, "taosdata");
this.batchSize = config.getInt(Key.BATCH_SIZE, Constants.DEFAULT_BATCH_SIZE);
}
@Override
public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) {
// opentsdb json protocol use JNI and schemaless API to write
String host = properties.getProperty(Key.HOST);
int port = Integer.parseInt(properties.getProperty(Key.PORT));
String dbname = properties.getProperty(Key.DBNAME);
String user = properties.getProperty(Key.USER);
String password = properties.getProperty(Key.PASSWORD);
JniConnection conn = null;
long count = 0;
try {
conn = new JniConnection(properties);
conn.open(host, port, dbname, user, password);
LOG.info("TDengine connection established, host: " + host + ", port: " + port + ", dbname: " + dbname + ", user: " + user);
int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, DEFAULT_BATCH_SIZE));
count = writeOpentsdb(lineReceiver, conn, batchSize);
public int handle(RecordReceiver lineReceiver, TaskPluginCollector collector) {
int count = 0;
try (Connection conn = DriverManager.getConnection(jdbcUrl, user, password);) {
LOG.info("connection[ jdbcUrl: " + jdbcUrl + ", username: " + user + "] established.");
writer = new SchemalessWriter(conn);
count = write(lineReceiver, batchSize);
} catch (Exception e) {
LOG.error(e.getMessage());
e.printStackTrace();
} finally {
try {
if (conn != null)
conn.close();
} catch (Exception e) {
e.printStackTrace();
}
LOG.info("TDengine connection closed");
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e);
}
return count;
}
private long writeOpentsdb(RecordReceiver lineReceiver, JniConnection conn, int batchSize) {
long recordIndex = 1;
private int write(RecordReceiver lineReceiver, int batchSize) throws DataXException {
int recordIndex = 1;
try {
Record record;
StringBuilder sb = new StringBuilder();
@ -56,14 +56,14 @@ public class OpentsdbDataHandler implements DataHandler {
if (batchSize == 1) {
String jsonData = recordToString(record);
LOG.debug(">>> " + jsonData);
conn.insertOpentsdbJson(jsonData);
writer.write(jsonData, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
} else if (recordIndex % batchSize == 1) {
sb.append("[").append(recordToString(record)).append(",");
} else if (recordIndex % batchSize == 0) {
sb.append(recordToString(record)).append("]");
String jsonData = sb.toString();
LOG.debug(">>> " + jsonData);
conn.insertOpentsdbJson(jsonData);
writer.write(jsonData, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
sb.delete(0, sb.length());
} else {
sb.append(recordToString(record)).append(",");
@ -72,11 +72,11 @@ public class OpentsdbDataHandler implements DataHandler {
}
if (sb.length() != 0 && sb.charAt(0) == '[') {
String jsonData = sb.deleteCharAt(sb.length() - 1).append("]").toString();
System.err.println(jsonData);
LOG.debug(">>> " + jsonData);
conn.insertOpentsdbJson(jsonData);
writer.write(jsonData, SchemalessProtocolType.JSON, SchemalessTimestampType.NOT_CONFIGURED);
}
} catch (Exception e) {
LOG.error("TDengineWriter ERROR: " + e.getMessage());
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e);
}
return recordIndex - 1;

View File

@ -1,9 +1,7 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import org.apache.commons.codec.digest.DigestUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -12,260 +10,163 @@ import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.*;
import java.util.stream.Collectors;
public class SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
private String stable; // 目标超表名
private Map<String, String> fixedTagValue = new HashMap<>(); // 固定标签值 标签名 -> 标签值
private Map<String, Integer> tagIndexMap = new HashMap<>(); // 动态标签值 标签名 -> 列索引
private Map<String, Integer> fieldIndexMap = new HashMap<>(); // 字段名 -> 字段索引
private String tsColName; // 时间戳列名
private int tsColIndex = -1; // 时间戳列索引
private List<String> fieldList = new ArrayList<>();
private List<String> tagList = new ArrayList<>();
private boolean canInferSchemaFromConfig = false;
private final Connection conn;
private TimestampPrecision precision;
public SchemaManager() {
SchemaManager(Connection conn) {
this.conn = conn;
}
public SchemaManager(Properties properties) {
getFromConfig(properties);
}
public TimestampPrecision loadDatabasePrecision() throws DataXException {
if (this.precision != null)
return this.precision;
private String mapDataxType(Column.Type type) {
switch (type) {
case LONG:
return "BIGINT";
case DOUBLE:
return "DOUBLE";
case STRING:
return "NCHAR(64)";
case DATE:
return "TIMESTAMP";
case BOOL:
return "BOOL";
case BYTES:
return "BINARY(64)";
default:
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, type.toString());
}
}
public void setStable(String stable) {
stable = stable;
}
public String getStable() {
return stable;
}
private void getFromConfig(Properties properties) {
stable = properties.getProperty(Key.STABLE);
if (stable == null) {
LOG.error("Config error: no stable");
return;
}
for (Object key : properties.keySet()) {
String k = (String) key;
String v = properties.getProperty(k);
String[] ps = k.split("\\.");
if (ps.length == 1) {
continue;
}
if (k.startsWith(Key.TAG_COLUMN)) {
String tagName = ps[1];
try {
Integer tagIndex = Integer.parseInt(v);
this.tagIndexMap.put(tagName, tagIndex);
tagList.add(tagName);
} catch (NumberFormatException e) {
fixedTagValue.put(tagName, v);
tagList.add(tagName);
}
} else if (k.startsWith(Key.FIELD_COLUMN)) {
String fieldName = ps[1];
Integer fileIndex = Integer.parseInt(v);
fieldIndexMap.put(fieldName, fileIndex);
} else if (k.startsWith(Key.TIMESTAMP_COLUMN)) {
tsColName = ps[1];
tsColIndex = Integer.parseInt(v);
}
}
List<String> sortedFieldName = fieldIndexMap.entrySet().stream().sorted((x, y) -> x.getValue().compareTo(y.getValue())).map(e -> e.getKey()).collect(Collectors.toList());
fieldList.addAll(sortedFieldName); // 排序的目的是保证自动建表时列的顺序和输入数据的列的顺序保持一致
canInferSchemaFromConfig = tsColIndex > -1 && !(fixedTagValue.isEmpty() && tagIndexMap.isEmpty()) && !fieldIndexMap.isEmpty();
LOG.info("Config file parsed resultfixedTags=[{}] ,tags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", fixedTagValue.keySet()), String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
}
public boolean shouldGuessSchema() {
return !canInferSchemaFromConfig;
}
public boolean shouldCreateTable() {
return canInferSchemaFromConfig;
}
public boolean configValid() {
boolean valid = (tagList.size() > 0 && fieldList.size() > 0 && tsColIndex > -1) || (tagList.size() == 0 && fieldList.size() == 0 && tsColIndex == -1);
if (!valid) {
LOG.error("Config error: tagColumn, fieldColumn and timestampColumn must be present together or absent together.");
}
return valid;
}
/**
* 通过执行`describe dbname.stable`命令获取表的schema.<br/>
* describe命名返回有4列内容分布是Field,Type,Length,Note<br/>
*
* @return 成功返回true如果超表不存在或其他错误则返回false
*/
public boolean getFromDB(Connection conn) {
try {
List<String> stables = getSTables(conn);
if (!stables.contains(stable)) {
LOG.error("super table {} not exist fail to get schema from database.", stable);
return false;
}
} catch (SQLException e) {
LOG.error(e.getMessage());
e.printStackTrace();
return false;
}
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("describe " + stable);
int colIndex = 0;
ResultSet rs = stmt.executeQuery("select database()");
String dbname = null;
while (rs.next()) {
String name = rs.getString(1);
String type = rs.getString(2);
String note = rs.getString(4);
if ("TIMESTAMP".equals(type)) {
tsColName = name;
tsColIndex = colIndex;
} else if ("TAG".equals(note)) {
tagIndexMap.put(name, colIndex);
tagList.add(name);
} else {
fieldIndexMap.put(name, colIndex);
fieldList.add(name);
}
colIndex++;
dbname = rs.getString("database()");
}
if (dbname == null)
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
"Database not specified or available");
rs = stmt.executeQuery("show databases");
while (rs.next()) {
String name = rs.getString("name");
if (!name.equalsIgnoreCase(dbname))
continue;
String precision = rs.getString("precision");
switch (precision) {
case "ns":
this.precision = TimestampPrecision.NANOSEC;
break;
case "us":
this.precision = TimestampPrecision.MICROSEC;
break;
case "ms":
default:
this.precision = TimestampPrecision.MILLISEC;
}
}
LOG.info("table infotags=[{}], fields=[{}], tsColName={}, tsIndex={}", String.join(",", tagIndexMap.keySet()), String.join(",", fieldList), tsColName, tsColIndex);
return true;
} catch (SQLException e) {
LOG.error(e.getMessage());
e.printStackTrace();
return false;
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
return this.precision;
}
public static List<String> getSTables(Connection conn) throws SQLException {
List<String> stables = new ArrayList<>();
public Map<String, TableMeta> loadTableMeta(List<String> tables) throws DataXException {
Map<String, TableMeta> tableMetas = new HashMap();
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("show stables");
while (rs.next()) {
String name = rs.getString(1);
stables.add(name);
TableMeta tableMeta = buildSupTableMeta(rs);
if (!tables.contains(tableMeta.tbname))
continue;
tableMetas.put(tableMeta.tbname, tableMeta);
}
}
return stables;
}
public void createSTable(Connection conn, List<Column.Type> fieldTypes) throws SQLException {
StringBuilder sb = new StringBuilder();
sb.append("CREATE STABLE IF NOT EXISTS ").append(stable).append("(");
sb.append(tsColName).append(" ").append("TIMESTAMP,");
for (int i = 0; i < fieldList.size(); ++i) {
String fieldName = fieldList.get(i);
Column.Type dxType = fieldTypes.get(i);
sb.append(fieldName).append(' ');
String tdType = mapDataxType(dxType);
sb.append(tdType).append(',');
}
sb.deleteCharAt(sb.length() - 1);
sb.append(") TAGS(");
for (String tagName : tagList) {
sb.append(tagName).append(" NCHAR(64),");
}
sb.deleteCharAt(sb.length() - 1);
sb.append(")");
String q = sb.toString();
LOG.info("run sql" + q);
try (Statement stmt = conn.createStatement()) {
stmt.execute(q);
}
}
public String[] getTagValuesFromRecord(Record record) {
String[] tagValues = new String[tagList.size()];
for (int i = 0; i < tagList.size(); ++i) {
if (fixedTagValue.containsKey(tagList.get(i))) {
tagValues[i] = fixedTagValue.get(tagList.get(i));
} else {
int tagIndex = tagIndexMap.get(tagList.get(i));
tagValues[i] = record.getColumn(tagIndex).asString();
rs = stmt.executeQuery("show tables");
while (rs.next()) {
TableMeta tableMeta = buildSubTableMeta(rs);
if (!tables.contains(tableMeta.tbname))
continue;
tableMetas.put(tableMeta.tbname, tableMeta);
}
if (tagValues[i] == null) {
return null;
for (String tbname : tables) {
if (!tableMetas.containsKey(tbname)) {
LOG.error("table metadata of " + tbname + " is empty!");
}
}
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
return tagValues;
return tableMetas;
}
public boolean hasTimestamp(Record record) {
Column column = record.getColumn(tsColIndex);
if (column.getType() == Column.Type.DATE && column.asDate() != null) {
return true;
} else {
return false;
public Map<String, List<ColumnMeta>> loadColumnMetas(List<String> tables) throws DataXException {
Map<String, List<ColumnMeta>> ret = new HashMap<>();
for (String table : tables) {
List<ColumnMeta> columnMetaList = new ArrayList<>();
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery("describe " + table);
for (int i = 0; rs.next(); i++) {
ColumnMeta columnMeta = buildColumnMeta(rs, i == 0);
columnMetaList.add(columnMeta);
}
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
if (columnMetaList.isEmpty()) {
LOG.error("column metadata of " + table + " is empty!");
continue;
}
columnMetaList.stream().filter(colMeta -> colMeta.isTag).forEach(colMeta -> {
String sql = "select " + colMeta.field + " from " + table;
Object value = null;
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery(sql);
for (int i = 0; rs.next(); i++) {
value = rs.getObject(colMeta.field);
if (i > 0) {
value = null;
break;
}
}
} catch (SQLException e) {
e.printStackTrace();
}
colMeta.value = value;
});
LOG.debug("load column metadata of " + table + ": " + Arrays.toString(columnMetaList.toArray()));
ret.put(table, columnMetaList);
}
return ret;
}
public Map<String, Integer> getFieldIndexMap() {
return fieldIndexMap;
private TableMeta buildSupTableMeta(ResultSet rs) throws SQLException {
TableMeta tableMeta = new TableMeta();
tableMeta.tableType = TableType.SUP_TABLE;
tableMeta.tbname = rs.getString("name");
tableMeta.columns = rs.getInt("columns");
tableMeta.tags = rs.getInt("tags");
tableMeta.tables = rs.getInt("tables");
LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
return tableMeta;
}
public List<String> getFieldList() {
return fieldList;
private TableMeta buildSubTableMeta(ResultSet rs) throws SQLException {
TableMeta tableMeta = new TableMeta();
String stable_name = rs.getString("stable_name");
tableMeta.tableType = StringUtils.isBlank(stable_name) ? TableType.NML_TABLE : TableType.SUB_TABLE;
tableMeta.tbname = rs.getString("table_name");
tableMeta.columns = rs.getInt("columns");
tableMeta.stable_name = StringUtils.isBlank(stable_name) ? null : stable_name;
LOG.debug("load table metadata of " + tableMeta.tbname + ": " + tableMeta);
return tableMeta;
}
public String getJoinedFieldNames() {
return tsColName + ", " + String.join(", ", fieldList);
private ColumnMeta buildColumnMeta(ResultSet rs, boolean isPrimaryKey) throws SQLException {
ColumnMeta columnMeta = new ColumnMeta();
columnMeta.field = rs.getString("Field");
columnMeta.type = rs.getString("Type");
columnMeta.length = rs.getInt("Length");
columnMeta.note = rs.getString("Note");
columnMeta.isTag = columnMeta.note != null && columnMeta.note.equals("TAG");
columnMeta.isPrimaryKey = isPrimaryKey;
return columnMeta;
}
public int getTsColIndex() {
return tsColIndex;
}
public String getTagValuesPlaceHolder() {
return tagList.stream().map(x -> "?").collect(Collectors.joining(","));
}
public String getFieldValuesPlaceHolder() {
return "?, " + fieldList.stream().map(x -> "?").collect(Collectors.joining(", "));
}
/**
* 计算子表表名
* <ol>
* <li>将标签的value 组合成为如下的字符串: tag_value1!tag_value2!tag_value3</li>
* <li>计算该字符串的 MD5 散列值 "md5_val"</li>
* <li>"t_md5val"作为子表名其中的 "t" 是固定的前缀</li>
* </ol>
*
* @param tagValues
* @return
*/
public String computeTableName(String[] tagValues) {
String s = String.join("!", tagValues);
return "t_" + DigestUtils.md5Hex(s);
}
public int getDynamicTagCount() {
return tagIndexMap.size();
}
}

View File

@ -1,37 +1,56 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.util.Properties;
import java.util.Set;
public class TDengineWriter extends Writer {
private static final String PEER_PLUGIN_NAME = "peerPluginName";
static {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
public static class Job extends Writer.Job {
private Configuration originalConfig;
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
this.originalConfig.set(PEER_PLUGIN_NAME, getPeerPluginName());
// check user
String user = this.originalConfig.getString(Key.USERNAME);
if (StringUtils.isBlank(user))
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
+ Key.USERNAME + "] is not set.");
// check password
String password = this.originalConfig.getString(Key.PASSWORD);
if (StringUtils.isBlank(password))
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
+ Key.PASSWORD + "] is not set.");
// check connection
List<Object> connection = this.originalConfig.getList(Key.CONNECTION);
if (connection == null || connection.isEmpty())
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
+ Key.CONNECTION + "] is not set.");
if (connection.size() > 1)
LOG.warn("connection.size is " + connection.size() + " and only connection[0] will be used.");
Configuration conn = Configuration.from(connection.get(0).toString());
String jdbcUrl = conn.getString(Key.JDBC_URL);
if (StringUtils.isBlank(jdbcUrl))
throw DataXException.asDataXException(TDengineWriterErrorCode.REQUIRED_VALUE, "The parameter ["
+ Key.JDBC_URL + "] of connection is not set.");
// check column
}
@Override
@ -42,22 +61,30 @@ public class TDengineWriter extends Writer {
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> writerSplitConfigs = new ArrayList<>();
List<Object> conns = this.originalConfig.getList(Key.CONNECTION);
for (int i = 0; i < mandatoryNumber; i++) {
writerSplitConfigs.add(this.originalConfig);
Configuration clone = this.originalConfig.clone();
Configuration conf = Configuration.from(conns.get(0).toString());
String jdbcUrl = conf.getString(Key.JDBC_URL);
clone.set(Key.JDBC_URL, jdbcUrl);
clone.set(Key.TABLE, conf.getList(Key.TABLE));
clone.remove(Key.CONNECTION);
writerSplitConfigs.add(clone);
}
return writerSplitConfigs;
}
}
public static class Task extends Writer.Task {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration writerSliceConfig;
@Override
public void init() {
this.writerSliceConfig = getPluginJobConf();
}
@Override
@ -67,23 +94,16 @@ public class TDengineWriter extends Writer {
@Override
public void startWrite(RecordReceiver lineReceiver) {
Set<String> keys = this.writerSliceConfig.getKeys();
Properties properties = new Properties();
for (String key : keys) {
String value = this.writerSliceConfig.getString(key);
properties.setProperty(key, value);
}
if (!keys.contains(Key.USER)) {
properties.setProperty(Key.USER, "root");
}
if (!keys.contains(Key.PASSWORD)) {
properties.setProperty(Key.PASSWORD, "taosdata");
}
LOG.debug("========================properties==========================\n" + properties);
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
LOG.debug("start to handle record from: " + peerPluginName);
DataHandler handler = DataHandlerFactory.build(peerPluginName);
long records = handler.handle(lineReceiver, properties, getTaskPluginCollector());
DataHandler handler;
if (peerPluginName.equals("opentsdbreader"))
handler = new OpentsdbDataHandler(this.writerSliceConfig);
else
handler = new DefaultDataHandler(this.writerSliceConfig);
long records = handler.handle(lineReceiver, getTaskPluginCollector());
LOG.debug("handle data finished, records: " + records);
}

View File

@ -3,13 +3,16 @@ package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum TDengineWriterErrorCode implements ErrorCode {
RUNTIME_EXCEPTION("TDengineWriter-00", "运行时异常"),
TYPE_ERROR("TDengineWriter-00", "Datax类型无法正确映射到TDengine类型");
REQUIRED_VALUE("TDengineWriter-00", "缺失必要的值"),
ILLEGAL_VALUE("TDengineWriter-01", "值非法"),
RUNTIME_EXCEPTION("TDengineWriter-02", "运行时异常"),
TYPE_ERROR("TDengineWriter-03", "Datax类型无法正确映射到TDengine类型");
private final String code;
private final String description;
private TDengineWriterErrorCode(String code, String description) {
TDengineWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@ -26,7 +29,6 @@ public enum TDengineWriterErrorCode implements ErrorCode {
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s]. ", this.code,
this.description);
return String.format("Code:[%s], Description:[%s]. ", this.code, this.description);
}
}

View File

@ -0,0 +1,22 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
public class TableMeta {
TableType tableType;
String tbname;
int columns;
int tags;
int tables;
String stable_name;
@Override
public String toString() {
return "TableMeta{" +
"tableType=" + tableType +
", tbname='" + tbname + '\'' +
", columns=" + columns +
", tags=" + tags +
", tables=" + tables +
", stable_name='" + stable_name + '\'' +
'}';
}
}

View File

@ -0,0 +1,5 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
public enum TableType {
SUP_TABLE, SUB_TABLE, NML_TABLE
}

View File

@ -0,0 +1,5 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
public enum TimestampPrecision {
MILLISEC, MICROSEC, NANOSEC
}

View File

@ -1,105 +0,0 @@
/* DO NOT EDIT THIS FILE - it is machine generated */
#include <jni.h>
/* Header for class com_alibaba_datax_plugin_writer_JniConnection */
#ifndef _Included_com_alibaba_datax_plugin_writer_JniConnection
#define _Included_com_alibaba_datax_plugin_writer_JniConnection
#ifdef __cplusplus
extern "C" {
#endif
#undef com_alibaba_datax_plugin_writer_JniConnection_JNI_NULL_POINTER
#define com_alibaba_datax_plugin_writer_JniConnection_JNI_NULL_POINTER 0LL
#undef com_alibaba_datax_plugin_writer_JniConnection_JNI_SUCCESSFUL
#define com_alibaba_datax_plugin_writer_JniConnection_JNI_SUCCESSFUL 0L
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: initImp
* Signature: (Ljava/lang/String;)V
*/
JNIEXPORT void JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_initImp
(JNIEnv *, jclass, jstring);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: setOptions
* Signature: (ILjava/lang/String;)I
*/
JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_setOptions
(JNIEnv *, jclass, jint, jstring);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: getTsCharset
* Signature: ()Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getTsCharset
(JNIEnv *, jclass);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: connectImp
* Signature: (Ljava/lang/String;ILjava/lang/String;Ljava/lang/String;Ljava/lang/String;)J
*/
JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_connectImp
(JNIEnv *, jobject, jstring, jint, jstring, jstring, jstring);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: executeQueryImp
* Signature: ([BJ)J
*/
JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_executeQueryImp
(JNIEnv *, jobject, jbyteArray, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: getErrCodeImp
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrCodeImp
(JNIEnv *, jobject, jlong, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: getErrMsgImp
* Signature: (J)Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrMsgImp
(JNIEnv *, jobject, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: getErrMsgByCode
* Signature: (J)Ljava/lang/String;
*/
JNIEXPORT jstring JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getErrMsgByCode
(JNIEnv *, jobject, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: getAffectedRowsImp
* Signature: (JJ)I
*/
JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_getAffectedRowsImp
(JNIEnv *, jobject, jlong, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: closeConnectionImp
* Signature: (J)I
*/
JNIEXPORT jint JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_closeConnectionImp
(JNIEnv *, jobject, jlong);
/*
* Class: com_alibaba_datax_plugin_writer_JniConnection
* Method: insertOpentsdbJson
* Signature: (Ljava/lang/String;J)J
*/
JNIEXPORT jlong JNICALL Java_com_alibaba_datax_plugin_writer_JniConnection_insertOpentsdbJson
(JNIEnv *, jobject, jstring, jlong);
#ifdef __cplusplus
}
#endif
#endif

View File

@ -0,0 +1,235 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.record.DefaultRecord;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class DefaultDataHandlerTest {
private static final String host = "192.168.56.105";
private static Connection conn;
@Test
public void writeSupTableBySQL() {
// given
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"tbname\", \"ts\", \"f1\", \"f2\", \"t1\"]," +
"\"table\":[\"stb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," +
"\"batchSize\": \"1000\"" +
"}");
long current = System.currentTimeMillis();
List<Record> recordList = IntStream.range(1, 11).mapToObj(i -> {
Record record = new DefaultRecord();
record.addColumn(new StringColumn("tb" + (i + 10)));
record.addColumn(new DateColumn(current + 1000 * i));
record.addColumn(new LongColumn(1));
record.addColumn(new LongColumn(2));
record.addColumn(new LongColumn(i));
return record;
}).collect(Collectors.toList());
// when
DefaultDataHandler handler = new DefaultDataHandler(configuration);
List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas);
int count = handler.writeBatch(conn, recordList);
// then
Assert.assertEquals(10, count);
}
@Test
public void writeSupTableBySQL_2() {
// given
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"tbname\", \"ts\", \"f1\", \"t1\"]," +
"\"table\":[\"stb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," +
"\"batchSize\": \"1000\"" +
"}");
long current = System.currentTimeMillis();
List<Record> recordList = IntStream.range(1, 11).mapToObj(i -> {
Record record = new DefaultRecord();
record.addColumn(new StringColumn("tb" + (i + 10)));
record.addColumn(new DateColumn(current + 1000 * i));
record.addColumn(new LongColumn(1));
record.addColumn(new LongColumn(i));
return record;
}).collect(Collectors.toList());
// when
DefaultDataHandler handler = new DefaultDataHandler(configuration);
List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas);
int count = handler.writeBatch(conn, recordList);
// then
Assert.assertEquals(10, count);
}
@Test
public void writeSubTableWithTableName() {
// given
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"tbname\", \"ts\", \"f1\", \"f2\", \"t1\"]," +
"\"table\":[\"tb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," +
"\"batchSize\": \"1000\"" +
"}");
long current = System.currentTimeMillis();
List<Record> recordList = IntStream.range(1, 11).mapToObj(i -> {
Record record = new DefaultRecord();
record.addColumn(new StringColumn("tb" + i));
record.addColumn(new DateColumn(current + 1000 * i));
record.addColumn(new LongColumn(1));
record.addColumn(new LongColumn(2));
record.addColumn(new LongColumn(i));
return record;
}).collect(Collectors.toList());
// when
DefaultDataHandler handler = new DefaultDataHandler(configuration);
List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas);
int count = handler.writeBatch(conn, recordList);
// then
Assert.assertEquals(1, count);
}
@Test
public void writeSubTableWithoutTableName() {
// given
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," +
"\"table\":[\"tb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," +
"\"batchSize\": \"1000\"," +
"\"ignoreTagsUnmatched\": \"true\"" +
"}");
long current = System.currentTimeMillis();
List<Record> recordList = IntStream.range(1, 11).mapToObj(i -> {
Record record = new DefaultRecord();
record.addColumn(new DateColumn(current + 1000 * i));
record.addColumn(new LongColumn(1));
record.addColumn(new LongColumn(2));
record.addColumn(new LongColumn(i));
return record;
}).collect(Collectors.toList());
// when
DefaultDataHandler handler = new DefaultDataHandler(configuration);
List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas);
int count = handler.writeBatch(conn, recordList);
// then
Assert.assertEquals(1, count);
}
@Test
public void writeNormalTable() {
// given
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," +
"\"table\":[\"weather\"]," +
"\"jdbcUrl\":\"jdbc:TAOS-RS://" + host + ":6041/test\"," +
"\"batchSize\": \"1000\"," +
"\"ignoreTagsUnmatched\": \"true\"" +
"}");
long current = System.currentTimeMillis();
List<Record> recordList = IntStream.range(1, 11).mapToObj(i -> {
Record record = new DefaultRecord();
record.addColumn(new DateColumn(current + 1000 * i));
record.addColumn(new LongColumn(1));
record.addColumn(new LongColumn(2));
record.addColumn(new LongColumn(i));
return record;
}).collect(Collectors.toList());
// when
DefaultDataHandler handler = new DefaultDataHandler(configuration);
List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas);
int count = handler.writeBatch(conn, recordList);
// then
Assert.assertEquals(10, count);
}
@BeforeClass
public static void beforeClass() throws SQLException {
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041", "root", "taosdata");
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists scm_test");
stmt.execute("create database if not exists scm_test");
stmt.execute("use scm_test");
stmt.execute("create table stb1(ts timestamp, f1 int, f2 int) tags(t1 nchar(32))");
stmt.execute("create table stb2(ts timestamp, f1 int, f2 int, f3 int) tags(t1 int, t2 int)");
stmt.execute("create table tb1 using stb1 tags(1)");
stmt.execute("create table tb2 using stb1 tags(2)");
stmt.execute("create table tb3 using stb2 tags(1,1)");
stmt.execute("create table tb4 using stb2 tags(2,2)");
stmt.execute("create table weather(ts timestamp, f1 int, f2 int, f3 int, t1 int, t2 int)");
}
}
@AfterClass
public static void afterClass() throws SQLException {
if (conn != null) {
conn.close();
}
}
}

View File

@ -0,0 +1,85 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.record.DefaultRecord;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class DefaultDataHandlerTest2 {
private static final String host = "192.168.1.93";
private static Connection conn;
@Test
public void writeSupTableBySchemaless() throws SQLException {
// given
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," +
"\"table\":[\"stb1\"]," +
"\"jdbcUrl\":\"jdbc:TAOS://" + host + ":6030/scm_test\"," +
"\"batchSize\": \"1000\"" +
"}");
String jdbcUrl = configuration.getString("jdbcUrl");
Connection connection = DriverManager.getConnection(jdbcUrl, "root", "taosdata");
long current = System.currentTimeMillis();
List<Record> recordList = IntStream.range(1, 11).mapToObj(i -> {
Record record = new DefaultRecord();
record.addColumn(new DateColumn(current + 1000 * i));
record.addColumn(new LongColumn(1));
record.addColumn(new LongColumn(2));
record.addColumn(new StringColumn("t" + i + " 22"));
return record;
}).collect(Collectors.toList());
// when
DefaultDataHandler handler = new DefaultDataHandler(configuration);
List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(connection);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
Map<String, List<ColumnMeta>> columnMetas = schemaManager.loadColumnMetas(tables);
handler.setTableMetas(tableMetas);
handler.setColumnMetas(columnMetas);
handler.setSchemaManager(schemaManager);
int count = handler.writeBatch(connection, recordList);
// then
Assert.assertEquals(10, count);
}
@BeforeClass
public static void beforeClass() throws SQLException {
conn = DriverManager.getConnection("jdbc:TAOS-RS://" + host + ":6041", "root", "taosdata");
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists scm_test");
stmt.execute("create database if not exists scm_test");
stmt.execute("use scm_test");
stmt.execute("create table stb1(ts timestamp, f1 int, f2 int) tags(t1 nchar(32))");
}
}
@AfterClass
public static void afterClass() throws SQLException {
if (conn != null) {
conn.close();
}
}
}

View File

@ -0,0 +1,65 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.junit.Assert;
import org.junit.Test;
import java.sql.*;
public class EngineTest {
@Test
public void opentsdb2tdengine() throws SQLException {
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/opentsdb2tdengine.json"};
System.setProperty("datax.home", "../target/datax/datax");
try {
Engine.entry(params);
} catch (Throwable e) {
e.printStackTrace();
}
// assert
String jdbcUrl = "jdbc:TAOS://192.168.56.105:6030/test?timestampFormat=TIMESTAMP";
try (Connection conn = DriverManager.getConnection(jdbcUrl, "root", "taosdata")) {
Statement stmt = conn.createStatement();
ResultSet rs = stmt.executeQuery("select count(*) from weather_temperature");
int rows = 0;
while (rs.next()) {
rows = rs.getInt("count(*)");
}
Assert.assertEquals(5, rows);
stmt.close();
}
}
@Test
public void mysql2tdengine() {
// given
// MYSQL SQL:
// create table t(id int primary key AUTO_INCREMENT, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, f6 double,ts timestamp, dt datetime,f7 nchar(64))
// insert into t(f1,f2,f3,f4,f5,f6,ts,dt,f7) values(1,2,3,4,5,6,'2022-01-28 12:00:00','2022-01-28 12:00:00', 'beijing');
// TDengine SQL:
// create table t(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint, f5 float, f6 double, dt timestamp,f7 nchar(64));
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/mysql2tdengine.json"};
System.setProperty("datax.home", "../target/datax/datax");
try {
Engine.entry(params);
} catch (Throwable e) {
e.printStackTrace();
}
}
@Test
public void tdengine2tdengine() {
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/tdengine2tdengine.json"};
System.setProperty("datax.home", "../target/datax/datax");
try {
Engine.entry(params);
} catch (Throwable e) {
e.printStackTrace();
}
}
}

View File

@ -1,21 +0,0 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import org.junit.Test;
import java.util.Properties;
public class JniConnectionTest {
@Test
public void test() throws Exception {
JniConnection connection = new JniConnection(new Properties());
connection.open("192.168.56.105", 6030, "test", "root", "taosdata");
String json = "{\"metric\":\"weather_temperature\",\"timestamp\":1609430400000,\"value\":123,\"tags\":{\"location\":\"beijing\",\"id\":\"t123\"}}";
connection.insertOpentsdbJson(json);
connection.close();
}
}

View File

@ -0,0 +1,88 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.fastjson.util.TypeUtils;
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
public class SchemaManagerTest {
private static Connection conn;
@Test
public void loadTableMeta() throws SQLException {
// given
SchemaManager schemaManager = new SchemaManager(conn);
List<String> tables = Arrays.asList("stb1", "stb2", "tb1", "tb3", "weather");
// when
Map<String, TableMeta> tableMetaMap = schemaManager.loadTableMeta(tables);
// then
TableMeta stb1 = tableMetaMap.get("stb1");
Assert.assertEquals(TableType.SUP_TABLE, stb1.tableType);
Assert.assertEquals("stb1", stb1.tbname);
Assert.assertEquals(3, stb1.columns);
Assert.assertEquals(1, stb1.tags);
Assert.assertEquals(2, stb1.tables);
TableMeta tb3 = tableMetaMap.get("tb3");
Assert.assertEquals(TableType.SUB_TABLE, tb3.tableType);
Assert.assertEquals("tb3", tb3.tbname);
Assert.assertEquals(4, tb3.columns);
Assert.assertEquals("stb2", tb3.stable_name);
TableMeta weather = tableMetaMap.get("weather");
Assert.assertEquals(TableType.NML_TABLE, weather.tableType);
Assert.assertEquals("weather", weather.tbname);
Assert.assertEquals(6, weather.columns);
Assert.assertNull(weather.stable_name);
}
@Test
public void loadColumnMetas() {
// given
SchemaManager schemaManager = new SchemaManager(conn);
List<String> tables = Arrays.asList("stb1", "stb2", "tb1", "tb3", "weather");
// when
Map<String, List<ColumnMeta>> columnMetaMap = schemaManager.loadColumnMetas(tables);
// then
List<ColumnMeta> stb1 = columnMetaMap.get("stb1");
Assert.assertEquals(4, stb1.size());
}
@BeforeClass
public static void beforeClass() throws SQLException {
conn = DriverManager.getConnection("jdbc:TAOS-RS://192.168.56.105:6041", "root", "taosdata");
try (Statement stmt = conn.createStatement()) {
stmt.execute("drop database if exists scm_test");
stmt.execute("create database if not exists scm_test");
stmt.execute("use scm_test");
stmt.execute("create table stb1(ts timestamp, f1 int, f2 int) tags(t1 int)");
stmt.execute("create table stb2(ts timestamp, f1 int, f2 int, f3 int) tags(t1 int, t2 int)");
stmt.execute("insert into tb1 using stb1 tags(1) values(now, 1, 2)");
stmt.execute("insert into tb2 using stb1 tags(2) values(now, 1, 2)");
stmt.execute("insert into tb3 using stb2 tags(1,1) values(now, 1, 2, 3)");
stmt.execute("insert into tb4 using stb2 tags(2,2) values(now, 1, 2, 3)");
stmt.execute("create table weather(ts timestamp, f1 int, f2 int, f3 int, t1 int, t2 int)");
}
}
@AfterClass
public static void afterClass() throws SQLException {
if (conn != null) {
conn.close();
}
}
}

View File

@ -1,31 +1,62 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.util.Configuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.List;
public class TDengineWriterTest {
TDengineWriter.Job job;
@Test
public void testGetSchema() throws ClassNotFoundException, SQLException {
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata");
Connection conn = DriverManager.getConnection(jdbcUrl);
SchemaManager schemaManager = new SchemaManager();
schemaManager.setStable("test1");
schemaManager.getFromDB(conn);
@Before
public void before() {
job = new TDengineWriter.Job();
Configuration configuration = Configuration.from("{" +
"\"username\": \"root\"," +
"\"password\": \"taosdata\"," +
"\"column\": [\"ts\", \"f1\", \"f2\", \"t1\"]," +
"\"connection\": [{\"table\":[\"weather\"],\"jdbcUrl\":\"jdbc:TAOS-RS://master:6041/test\"}]," +
"\"batchSize\": \"1000\"" +
"}");
job.setPluginJobConf(configuration);
}
@Test
public void dropTestTable() throws ClassNotFoundException, SQLException {
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata");
Connection conn = DriverManager.getConnection(jdbcUrl);
Statement stmt = conn.createStatement();
stmt.execute("drop table market_snapshot");
public void jobInit() {
// when
job.init();
// assert
Configuration conf = job.getPluginJobConf();
Assert.assertEquals("root", conf.getString("username"));
Assert.assertEquals("taosdata", conf.getString("password"));
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("connection[0].jdbcUrl"));
Assert.assertEquals(new Integer(1000), conf.getInt("batchSize"));
Assert.assertEquals("ts", conf.getString("column[0]"));
Assert.assertEquals("f2", conf.getString("column[2]"));
}
}
@Test
public void jobSplit() {
// when
job.init();
List<Configuration> configurationList = job.split(10);
// assert
Assert.assertEquals(10, configurationList.size());
for (Configuration conf : configurationList) {
Assert.assertEquals("root", conf.getString("username"));
Assert.assertEquals("taosdata", conf.getString("password"));
Assert.assertEquals("jdbc:TAOS-RS://master:6041/test", conf.getString("jdbcUrl"));
Assert.assertEquals(new Integer(1000), conf.getInt("batchSize"));
Assert.assertEquals("ts", conf.getString("column[0]"));
Assert.assertEquals("f2", conf.getString("column[2]"));
}
}
}

View File

@ -0,0 +1,57 @@
{
"job": {
"content": [
{
"reader": {
"name": "mysqlreader",
"parameter": {
"username": "root",
"password": "123456",
"column": [
"*"
],
"splitPk": "id",
"connection": [
{
"table": [
"t"
],
"jdbcUrl": [
"jdbc:mysql://192.168.56.105:3306/test?useSSL=false&useUnicode=true&characterEncoding=utf8"
]
}
]
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"ts",
"f1",
"f2",
"t1"
],
"connection": [
{
"table": [
"st"
],
"jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/test?timestampFormat=TIMESTAMP"
}
],
"batchSize": 1000,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,36 @@
{
"job":{
"content":[{
"reader": {
"name": "opentsdbreader",
"parameter": {
"endpoint": "http://192.168.56.105:4242",
"column": ["weather_temperature"],
"beginDateTime": "2021-01-01 00:00:00",
"endDateTime": "2021-01-01 01:00:00"
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"table": [
"meters"
],
"jdbcUrl": "jdbc:TAOS://192.168.56.105:6030/test?timestampFormat=TIMESTAMP"
}
],
"batchSize": 1000
}
}
}],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,51 @@
{
"job": {
"content": [
{
"reader": {
"name": "tdenginereader",
"parameter": {
"username": "root",
"password": "taosdata",
"connection": [
{
"table": [
"meters"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test?timestampFormat=TIMESTAMP"
}
],
"column": [
"ts",
"current",
"voltage",
"phase",
"groupid",
"location"
],
"beginDateTime": "2017-07-14 10:40:00",
"endDateTime": "2017-08-14 10:40:00",
"splitInterval": "1d"
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "192.168.56.105",
"port": 6030,
"dbName": "test2",
"username": "root",
"password": "taosdata",
"batchSize": 1000,
"stable": "meters"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}