mongodb2tdengine test and refine

This commit is contained in:
dingbo 2021-11-18 18:06:10 +08:00
parent bf01999222
commit ca1851fb99
9 changed files with 150 additions and 25 deletions

View File

@ -11,7 +11,7 @@
"name": "mongodbreader",
"parameter": {
"address": [
"123.56.104.14:27017"
"127.0.0.1:27017"
],
"userName": "admin678",
"mechanism": "SCRAM-SHA-1",
@ -50,7 +50,7 @@
"writer": {
"name": "tdenginewriter",
"parameter": {
"host": "123.56.104.14",
"host": "127.0.0.1",
"port": 6030,
"dbname": "test",
"user": "root",

View File

@ -290,6 +290,9 @@ TAGS(
## 5 约束限制
1. 本插件自动创建超级表时NCHAR类型的长度固定为64对于包含长度大于64的字符串的数据源将不支持。
2. 标签列不能包含null值如果包含会被过滤掉。
## FAQ
### 如何选取要同步的数据的范围?
@ -300,10 +303,14 @@ TAGS(
如果Reader插件支持一次读多张表Writer插件就能一次导入多张表。如果Reader不支持多多张表可以建多个job分别导入。Writer插件只负责写数据。
### 1张源表导入之后对应TDengine中多少张表
### 张源表导入之后对应TDengine中多少张表
这是又tagColumn决定的如果所有tag列的值都相同目标表也只有一个。源表有多少不同的tag组合目标超表就会有多少子表。
### 源表和目标表的字段顺序一致吗?
TDengine要求每个表第一列是时间戳列后边是普通字段最后是标签列。如果源表不是这个顺序插件在自动建表是自动调整。
TDengine要求每个表第一列是时间戳列后边是普通字段最后是标签列。如果源表不是这个顺序插件在自动建表是自动调整。
### 插件如何确定各列的数据类型?
抽样收到的第一批数据自动推断各列的类型。schema是从数据来的因此要保障“好的”数据占大多数。

View File

@ -1,10 +1,11 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import java.util.Properties;
public interface DataHandler {
long handle(RecordReceiver lineReceiver, Properties properties);
long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector);
}

View File

@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.taosdata.jdbc.TSDBPreparedStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -16,7 +17,6 @@ import java.util.Properties;
*/
public class DefaultDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
static {
try {
Class.forName("com.taosdata.jdbc.TSDBDriver");
@ -26,7 +26,7 @@ public class DefaultDataHandler implements DataHandler {
}
@Override
public long handle(RecordReceiver lineReceiver, Properties properties) {
public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) {
SchemaManager schemaManager = new SchemaManager(properties);
if (!schemaManager.configValid()) {
return 0;
@ -47,7 +47,11 @@ public class DefaultDataHandler implements DataHandler {
}
int batchSize = Integer.parseInt(properties.getProperty(Key.BATCH_SIZE, "1000"));
return write(lineReceiver, conn, batchSize, schemaManager);
if (batchSize < 5) {
LOG.error("batchSize太小会增加自动类型推断错误的概率建议改大后重试");
return 0;
}
return write(lineReceiver, conn, batchSize, schemaManager, collector);
} catch (Exception e) {
LOG.error("write failed " + e.getMessage());
e.printStackTrace();
@ -79,18 +83,15 @@ public class DefaultDataHandler implements DataHandler {
* @return 成功写入记录数
* @throws SQLException
*/
private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm) throws SQLException {
private long write(RecordReceiver lineReceiver, Connection conn, int batchSize, SchemaManager scm, TaskPluginCollector collector) throws SQLException {
Record record = lineReceiver.getFromReader();
if (record == null) {
return 0;
}
if (scm.shouldCreateTable()) {
scm.createSTable(conn, record);
}
String pq = String.format("INSERT INTO ? USING %s TAGS(%s) (%s) values (%s)", scm.getStable(), scm.getTagValuesPlaceHolder(), scm.getJoinedFieldNames(), scm.getFieldValuesPlaceHolder());
LOG.info("Prepared SQL: {}", pq);
try (TSDBPreparedStatement stmt = (TSDBPreparedStatement) conn.prepareStatement(pq)) {
JDBCBatchWriter batchWriter = new JDBCBatchWriter(stmt, scm, batchSize);
JDBCBatchWriter batchWriter = new JDBCBatchWriter(conn, stmt, scm, batchSize, collector);
do {
batchWriter.append(record);
} while ((record = lineReceiver.getFromReader()) != null);

View File

@ -3,10 +3,13 @@ package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.AbstractTaskPlugin;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.taosdata.jdbc.TSDBPreparedStatement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.HashMap;
@ -26,10 +29,12 @@ import java.util.stream.Collectors;
*/
public class JDBCBatchWriter {
public static final Logger LOG = LoggerFactory.getLogger(JDBCBatchWriter.class);
private TSDBPreparedStatement stmt;
private SchemaManager scm;
private Connection conn;
private int batchSize;
private TaskPluginCollector collector;
// 缓存Record key为tableName
Map<String, List<Record>> buf = new HashMap<>();
// 缓存表的标签值 key为tableName
@ -37,25 +42,57 @@ public class JDBCBatchWriter {
private long sucCount = 0;
private final int tsColIndex;
private List<String> fieldList;
// 每个record至少应该包含的列数用于检测数据
private int minColNum = 0;
private Map<String, Integer> fieldIndexMap;
private List<Column.Type> fieldTypes = null;
public JDBCBatchWriter(TSDBPreparedStatement stmt, SchemaManager scm, int batchSize) {
public JDBCBatchWriter(Connection conn, TSDBPreparedStatement stmt, SchemaManager scm, int batchSize, TaskPluginCollector collector) {
this.conn = conn;
this.stmt = stmt;
this.scm = scm;
this.batchSize = batchSize;
this.collector = collector;
this.tsColIndex = scm.getTsColIndex();
this.fieldList = scm.getFieldList();
this.fieldIndexMap = scm.getFieldIndexMap();
this.minColNum = 1 + fieldList.size() + scm.getDynamicTagCount();
}
public void initFiledTypesAndTargetTable(List<Record> records) throws SQLException {
if (fieldTypes != null) {
return;
}
guessFieldTypes(records);
if (scm.shouldCreateTable()) {
scm.createSTable(conn, fieldTypes);
}
}
public void append(Record record) throws SQLException {
int columnNum = record.getColumnNumber();
if (columnNum < minColNum) {
collector.collectDirtyRecord(record, "实际列数小于期望列数");
return;
}
String[] tagValues = scm.getTagValuesFromRecord(record);
if (tagValues == null) {
collector.collectDirtyRecord(record, "标签列包含null");
return;
}
if (!scm.hasTimestamp(record)) {
collector.collectDirtyRecord(record, "时间戳列为null或类型错误");
return;
}
String tableName = scm.computeTableName(tagValues);
if (buf.containsKey(tableName)) {
List<Record> lis = buf.get(tableName);
lis.add(record);
if (lis.size() == batchSize) {
if (fieldTypes == null) {
initFiledTypesAndTargetTable(lis);
}
executeBatch(tableName);
lis.clear();
}
@ -67,6 +104,49 @@ public class JDBCBatchWriter {
}
}
/**
* 只有String类型比较特别测试发现值为null的列会转成String类型所以Column的类型为String并不代表这一列的类型真的是String
*
* @param records
*/
private void guessFieldTypes(List<Record> records) {
fieldTypes = new ArrayList<>(fieldList.size());
for (int i = 0; i < fieldList.size(); ++i) {
int colIndex = fieldIndexMap.get(fieldList.get(i));
boolean ok = false;
for (int j = 0; j < records.size() && !ok; ++j) {
Column column = records.get(j).getColumn(colIndex);
Column.Type type = column.getType();
switch (type) {
case LONG:
case DOUBLE:
case DATE:
case BOOL:
case BYTES:
if (column.getRawData() != null) {
fieldTypes.add(type);
ok = true;
}
break;
case STRING:
// 只有非null且非空的String列才会被真的当作String类型
String value = column.asString();
if (value != null && !"".equals(value)) {
fieldTypes.add(type);
ok = true;
}
break;
default:
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString());
}
}
if (!ok) {
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, String.format("根据采样的%d条数据无法推断第%d列的数据类型", records.size(), i + 1));
}
}
LOG.info("Field Types: {}", fieldTypes);
}
/**
* 执行单表批量写入
*
@ -87,12 +167,10 @@ public class JDBCBatchWriter {
ArrayList<Long> tsList = records.stream().map(r -> r.getColumn(tsColIndex).asDate().getTime()).collect(Collectors.toCollection(ArrayList::new));
stmt.setTimestamp(0, tsList);
// 字段
Record record = records.get(0);
for (int i = 0; i < fieldList.size(); ) {
String fieldName = fieldList.get(i);
int index = fieldIndexMap.get(fieldName);
Column column = record.getColumn(index);
switch (column.getType()) {
switch (fieldTypes.get(i)) {
case LONG:
ArrayList<Long> lisLong = records.stream().map(r -> r.getColumn(index).asBigInteger().longValue()).collect(Collectors.toCollection(ArrayList::new));
stmt.setLong(++i, lisLong);
@ -118,7 +196,7 @@ public class JDBCBatchWriter {
stmt.setString(++i, lisBytes, 64);
break;
default:
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, column.getType().toString());
throw DataXException.asDataXException(TDengineWriterErrorCode.TYPE_ERROR, fieldTypes.get(i).toString());
}
}
// 执行
@ -132,6 +210,16 @@ public class JDBCBatchWriter {
* 把缓存的Record全部写入
*/
public void flush() throws SQLException {
if (fieldTypes == null) {
List<Record> records = new ArrayList<>();
for (List<Record> lis : buf.values()) {
records.addAll(lis);
if (records.size() > 100) {
break;
}
}
initFiledTypesAndTargetTable(records);
}
for (String tabName : buf.keySet()) {
if (buf.get(tabName).size() > 0) {
executeBatch(tabName);

View File

@ -4,6 +4,7 @@ import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -14,7 +15,7 @@ public class OpentsdbDataHandler implements DataHandler {
private static final String DEFAULT_BATCH_SIZE = "1";
@Override
public long handle(RecordReceiver lineReceiver, Properties properties) {
public long handle(RecordReceiver lineReceiver, Properties properties, TaskPluginCollector collector) {
// opentsdb json protocol use JNI and schemaless API to write
String host = properties.getProperty(Key.HOST);
int port = Integer.parseInt(properties.getProperty(Key.PORT));

View File

@ -176,14 +176,15 @@ public class SchemaManager {
return stables;
}
public void createSTable(Connection conn, Record record) throws SQLException {
public void createSTable(Connection conn, List<Column.Type> fieldTypes) throws SQLException {
StringBuilder sb = new StringBuilder();
sb.append("CREATE STABLE IF NOT EXISTS ").append(stable).append("(");
sb.append(tsColName).append(" ").append("TIMESTAMP,");
for (String fieldName : fieldList) {
for (int i = 0; i < fieldList.size(); ++i) {
String fieldName = fieldList.get(i);
Column.Type dxType = fieldTypes.get(i);
sb.append(fieldName).append(' ');
Column col = record.getColumn(fieldIndexMap.get(fieldName));
String tdType = mapDataxType(col.getType());
String tdType = mapDataxType(dxType);
sb.append(tdType).append(',');
}
sb.deleteCharAt(sb.length() - 1);
@ -209,10 +210,22 @@ public class SchemaManager {
int tagIndex = tagIndexMap.get(tagList.get(i));
tagValues[i] = record.getColumn(tagIndex).asString();
}
if (tagValues[i] == null) {
return null;
}
}
return tagValues;
}
public boolean hasTimestamp(Record record) {
Column column = record.getColumn(tsColIndex);
if (column.getType() == Column.Type.DATE && column.asDate() != null) {
return true;
} else {
return false;
}
}
public Map<String, Integer> getFieldIndexMap() {
return fieldIndexMap;
}
@ -252,4 +265,8 @@ public class SchemaManager {
String s = String.join("!", tagValues);
return "t_" + DigestUtils.md5Hex(s);
}
public int getDynamicTagCount() {
return tagIndexMap.size();
}
}

View File

@ -74,7 +74,7 @@ public class TDengineWriter extends Writer {
String peerPluginName = this.writerSliceConfig.getString(PEER_PLUGIN_NAME);
LOG.debug("start to handle record from: " + peerPluginName);
DataHandler handler = DataHandlerFactory.build(peerPluginName);
long records = handler.handle(lineReceiver, properties);
long records = handler.handle(lineReceiver, properties, getTaskPluginCollector());
LOG.debug("handle data finished, records: " + records);
}

View File

@ -5,6 +5,7 @@ import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
public class TDengineWriterTest {
@ -18,4 +19,13 @@ public class TDengineWriterTest {
schemaManager.setStable("test1");
schemaManager.getFromDB(conn);
}
@Test
public void dropTestTable() throws ClassNotFoundException, SQLException {
Class.forName("com.taosdata.jdbc.TSDBDriver");
String jdbcUrl = String.format("jdbc:TAOS://%s:%s/%s?user=%s&password=%s", "wozai.fun", "6030", "test", "root", "taosdata");
Connection conn = DriverManager.getConnection(jdbcUrl);
Statement stmt = conn.createStatement();
stmt.execute("drop table market_snapshot");
}
}