[TS-1281]<feature>: collect dirty data in tdenginewriter

This commit is contained in:
zyyang 2022-03-08 14:57:03 +08:00
parent fa569937f2
commit 2ee8878da1
6 changed files with 186 additions and 25 deletions

View File

@ -30,6 +30,7 @@ public class DefaultDataHandler implements DataHandler {
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
private final TaskPluginCollector taskPluginCollector;
private String username; private String username;
private String password; private String password;
private String jdbcUrl; private String jdbcUrl;
@ -56,7 +57,7 @@ public class DefaultDataHandler implements DataHandler {
private Map<String, List<ColumnMeta>> columnMetas; private Map<String, List<ColumnMeta>> columnMetas;
public DefaultDataHandler(Configuration configuration) { public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) {
this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME); this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD); this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD);
this.jdbcUrl = configuration.getString(Key.JDBC_URL); this.jdbcUrl = configuration.getString(Key.JDBC_URL);
@ -64,6 +65,7 @@ public class DefaultDataHandler implements DataHandler {
this.tables = configuration.getList(Key.TABLE, String.class); this.tables = configuration.getList(Key.TABLE, String.class);
this.columns = configuration.getList(Key.COLUMN, String.class); this.columns = configuration.getList(Key.COLUMN, String.class);
this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED, Constants.DEFAULT_IGNORE_TAGS_UNMATCHED); this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED, Constants.DEFAULT_IGNORE_TAGS_UNMATCHED);
this.taskPluginCollector = taskPluginCollector;
} }
@Override @Override
@ -86,14 +88,24 @@ public class DefaultDataHandler implements DataHandler {
if (i % batchSize != 0) { if (i % batchSize != 0) {
recordBatch.add(record); recordBatch.add(record);
} else { } else {
try {
affectedRows = writeBatch(conn, recordBatch); affectedRows = writeBatch(conn, recordBatch);
} catch (SQLException e) {
LOG.warn("use one row insert. because:" + e.getMessage());
affectedRows = writeEachRow(conn, recordBatch);
}
recordBatch.clear(); recordBatch.clear();
} }
count++; count++;
} }
if (!recordBatch.isEmpty()) { if (!recordBatch.isEmpty()) {
try {
affectedRows = writeBatch(conn, recordBatch); affectedRows = writeBatch(conn, recordBatch);
} catch (SQLException e) {
LOG.warn("use one row insert. because:" + e.getMessage());
affectedRows = writeEachRow(conn, recordBatch);
}
recordBatch.clear(); recordBatch.clear();
} }
} catch (SQLException e) { } catch (SQLException e) {
@ -107,6 +119,21 @@ public class DefaultDataHandler implements DataHandler {
return affectedRows; return affectedRows;
} }
private int writeEachRow(Connection conn, List<Record> recordBatch) {
int affectedRows = 0;
for (Record record : recordBatch) {
List<Record> recordList = new ArrayList<>();
recordList.add(record);
try {
affectedRows += writeBatch(conn, recordList);
} catch (SQLException e) {
LOG.error(e.getMessage());
this.taskPluginCollector.collectDirtyRecord(record, e);
}
}
return affectedRows;
}
/** /**
* table: [ "stb1", "stb2", "tb1", "tb2", "t1" ] * table: [ "stb1", "stb2", "tb1", "tb2", "t1" ]
* stb1[ts,f1,f2] tags:[t1] * stb1[ts,f1,f2] tags:[t1]
@ -118,7 +145,7 @@ public class DefaultDataHandler implements DataHandler {
* 3. 对于tb拼sql例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2) * 3. 对于tb拼sql例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2)
* 4. 对于t拼sql例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2) * 4. 对于t拼sql例如data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2)
*/ */
public int writeBatch(Connection conn, List<Record> recordBatch) { public int writeBatch(Connection conn, List<Record> recordBatch) throws SQLException {
int affectedRows = 0; int affectedRows = 0;
for (String table : tables) { for (String table : tables) {
TableMeta tableMeta = tableMetas.get(table); TableMeta tableMeta = tableMetas.get(table);
@ -146,7 +173,7 @@ public class DefaultDataHandler implements DataHandler {
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
*/ */
private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) { private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.columnMetas.get(table); List<ColumnMeta> columnMetas = this.columnMetas.get(table);
StringBuilder sb = new StringBuilder("insert into"); StringBuilder sb = new StringBuilder("insert into");
@ -177,13 +204,11 @@ public class DefaultDataHandler implements DataHandler {
return executeUpdate(conn, sql); return executeUpdate(conn, sql);
} }
private int executeUpdate(Connection conn, String sql) throws DataXException { private int executeUpdate(Connection conn, String sql) throws SQLException {
int count; int count;
try (Statement stmt = conn.createStatement()) { try (Statement stmt = conn.createStatement()) {
LOG.debug(">>> " + sql); LOG.debug(">>> " + sql);
count = stmt.executeUpdate(sql); count = stmt.executeUpdate(sql);
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
} }
return count; return count;
} }
@ -227,7 +252,7 @@ public class DefaultDataHandler implements DataHandler {
* table: ["stb1"], column: ["ts", "f1", "f2", "t1"] * table: ["stb1"], column: ["ts", "f1", "f2", "t1"]
* data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts * data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts
*/ */
private int writeBatchToSupTableBySchemaless(Connection conn, String table, List<Record> recordBatch) { private int writeBatchToSupTableBySchemaless(Connection conn, String table, List<Record> recordBatch) throws SQLException {
int count = 0; int count = 0;
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision(); TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
@ -296,11 +321,8 @@ public class DefaultDataHandler implements DataHandler {
default: default:
timestampType = SchemalessTimestampType.NOT_CONFIGURED; timestampType = SchemalessTimestampType.NOT_CONFIGURED;
} }
try {
writer.write(lines, SchemalessProtocolType.LINE, timestampType); writer.write(lines, SchemalessProtocolType.LINE, timestampType);
} catch (SQLException e) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
LOG.warn("schemalessWriter does not return affected rows!"); LOG.warn("schemalessWriter does not return affected rows!");
return count; return count;
@ -370,7 +392,7 @@ public class DefaultDataHandler implements DataHandler {
* else * else
* insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)]) * insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
*/ */
private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) { private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.columnMetas.get(table); List<ColumnMeta> columnMetas = this.columnMetas.get(table);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();
@ -440,7 +462,7 @@ public class DefaultDataHandler implements DataHandler {
* table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"] * table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"]
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...) * sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
*/ */
private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) { private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.columnMetas.get(table); List<ColumnMeta> columnMetas = this.columnMetas.get(table);
StringBuilder sb = new StringBuilder(); StringBuilder sb = new StringBuilder();

View File

@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver; import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
@ -81,10 +82,12 @@ public class TDengineWriter extends Writer {
private static final Logger LOG = LoggerFactory.getLogger(Task.class); private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private Configuration writerSliceConfig; private Configuration writerSliceConfig;
private TaskPluginCollector taskPluginCollector;
@Override @Override
public void init() { public void init() {
this.writerSliceConfig = getPluginJobConf(); this.writerSliceConfig = getPluginJobConf();
this.taskPluginCollector = super.getTaskPluginCollector();
} }
@Override @Override
@ -101,7 +104,7 @@ public class TDengineWriter extends Writer {
if (peerPluginName.equals("opentsdbreader")) if (peerPluginName.equals("opentsdbreader"))
handler = new OpentsdbDataHandler(this.writerSliceConfig); handler = new OpentsdbDataHandler(this.writerSliceConfig);
else else
handler = new DefaultDataHandler(this.writerSliceConfig); handler = new DefaultDataHandler(this.writerSliceConfig, this.taskPluginCollector);
long records = handler.handle(lineReceiver, getTaskPluginCollector()); long records = handler.handle(lineReceiver, getTaskPluginCollector());
LOG.debug("handle data finished, records: " + records); LOG.debug("handle data finished, records: " + records);

View File

@ -0,0 +1,41 @@
package com.alibaba.datax.plugin.writer.tdenginewriter;
import com.alibaba.datax.core.Engine;
import org.junit.Ignore;
import org.junit.Test;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.sql.Statement;
@Ignore
public class Csv2TDengineTest {
private static final String host = "192.168.56.105";
@Test
public void case01() throws Throwable {
// given
prepareTable();
// when
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/csv2t.json"};
System.setProperty("datax.home", "../target/datax/datax");
Engine.entry(params);
}
public void prepareTable() throws SQLException {
final String url = "jdbc:TAOS-RS://" + host + ":6041";
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
Statement stmt = conn.createStatement();
stmt.execute("drop database if exists test");
stmt.execute("create database if not exists test");
stmt.execute("create table test.weather (ts timestamp, temperature bigint, humidity double, is_normal bool) " +
"tags(device_id binary(10),address nchar(10))");
}
}
}

View File

@ -4,6 +4,8 @@ import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.LongColumn; import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn; import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.record.DefaultRecord; import com.alibaba.datax.core.transport.record.DefaultRecord;
import org.junit.AfterClass; import org.junit.AfterClass;
@ -25,6 +27,8 @@ public class DefaultDataHandlerTest {
private static final String host = "192.168.1.93"; private static final String host = "192.168.1.93";
private static Connection conn; private static Connection conn;
private final TaskPluginCollector taskPluginCollector = new TDengineWriter.Task().getTaskPluginCollector();
@Test @Test
public void writeSupTableBySQL() throws SQLException { public void writeSupTableBySQL() throws SQLException {
// given // given
@ -48,8 +52,9 @@ public class DefaultDataHandlerTest {
return record; return record;
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// when // when
DefaultDataHandler handler = new DefaultDataHandler(configuration); DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
List<String> tables = configuration.getList("table", String.class); List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn); SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
@ -87,7 +92,7 @@ public class DefaultDataHandlerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// when // when
DefaultDataHandler handler = new DefaultDataHandler(configuration); DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
List<String> tables = configuration.getList("table", String.class); List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn); SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
@ -127,7 +132,7 @@ public class DefaultDataHandlerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// when // when
DefaultDataHandler handler = new DefaultDataHandler(configuration); DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
List<String> tables = configuration.getList("table", String.class); List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(connection); SchemaManager schemaManager = new SchemaManager(connection);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
@ -166,7 +171,7 @@ public class DefaultDataHandlerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// when // when
DefaultDataHandler handler = new DefaultDataHandler(configuration); DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
List<String> tables = configuration.getList("table", String.class); List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn); SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
@ -205,7 +210,7 @@ public class DefaultDataHandlerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// when // when
DefaultDataHandler handler = new DefaultDataHandler(configuration); DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
List<String> tables = configuration.getList("table", String.class); List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn); SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
@ -244,7 +249,7 @@ public class DefaultDataHandlerTest {
}).collect(Collectors.toList()); }).collect(Collectors.toList());
// when // when
DefaultDataHandler handler = new DefaultDataHandler(configuration); DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
List<String> tables = configuration.getList("table", String.class); List<String> tables = configuration.getList("table", String.class);
SchemaManager schemaManager = new SchemaManager(conn); SchemaManager schemaManager = new SchemaManager(conn);
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables); Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);

View File

@ -0,0 +1,80 @@
{
"job": {
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": [
"/Users/yangzy/IdeaProjects/DataX/tdenginewriter/src/test/resources/weather.csv"
],
"encoding": "UTF-8",
"column": [
{
"index": 0,
"type": "string"
},
{
"index": 1,
"type": "date",
"format": "yyy-MM-dd HH:mm:ss.SSS"
},
{
"index": 2,
"type": "long"
},
{
"index": 3,
"type": "double"
},
{
"index": 4,
"type": "long"
},
{
"index": 5,
"type": "string"
},
{
"index": 6,
"type": "String"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "tdenginewriter",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"ts",
"temperature",
"humidity",
"is_normal",
"device_id",
"address"
],
"connection": [
{
"table": [
"weather"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test"
}
],
"batchSize": 100,
"ignoreTagsUnmatched": true
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,10 @@
tb1,2022-02-20 04:05:59.255,5,8.591868744,1,abcABC123,北京朝阳望京
tb1,2022-02-20 04:58:47.068,3,1.489693641,1,abcABC123,北京朝阳望京
tb1,2022-02-20 06:31:09.408,1,4.026500719,1,abcABC123,北京朝阳望京
tb1,2022-02-20 08:08:00.336,1,9.606400360,1,abcABC123,北京朝阳望京
tb1,2022-02-20 08:28:58.053,9,7.872178184,1,abcABC123123,北京朝阳望京
tb1,2022-02-20 10:23:20.836,9,2.699478524,1,abcABC123,北京朝阳望京
tb1,2022-02-20 11:09:59.739,7,7.906723716,1,abcABC123,北京朝阳望京
tb1,2022-02-20 19:08:29.315,1,5.852338895,1,abcABC123,北京朝阳望京
tb1,2022-02-20 22:10:06.243,10,5.535007901,1,abcABC123,北京朝阳望京
tb1,2022-02-20 23:52:43.683,10,10.642013185,1,abcABC123,北京朝阳望京
1 tb1 2022-02-20 04:05:59.255 5 8.591868744 1 abcABC123 北京朝阳望京
2 tb1 2022-02-20 04:58:47.068 3 1.489693641 1 abcABC123 北京朝阳望京
3 tb1 2022-02-20 06:31:09.408 1 4.026500719 1 abcABC123 北京朝阳望京
4 tb1 2022-02-20 08:08:00.336 1 9.606400360 1 abcABC123 北京朝阳望京
5 tb1 2022-02-20 08:28:58.053 9 7.872178184 1 abcABC123123 北京朝阳望京
6 tb1 2022-02-20 10:23:20.836 9 2.699478524 1 abcABC123 北京朝阳望京
7 tb1 2022-02-20 11:09:59.739 7 7.906723716 1 abcABC123 北京朝阳望京
8 tb1 2022-02-20 19:08:29.315 1 5.852338895 1 abcABC123 北京朝阳望京
9 tb1 2022-02-20 22:10:06.243 10 5.535007901 1 abcABC123 北京朝阳望京
10 tb1 2022-02-20 23:52:43.683 10 10.642013185 1 abcABC123 北京朝阳望京