mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 10:09:07 +08:00
Merge pull request #25 from taosdata/dev/zyyang
[TS-1281]<feature>: collect dirty data in tdenginewriter
This commit is contained in:
commit
99ce616479
@ -30,6 +30,7 @@ public class DefaultDataHandler implements DataHandler {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class);
|
||||
|
||||
private final TaskPluginCollector taskPluginCollector;
|
||||
private String username;
|
||||
private String password;
|
||||
private String jdbcUrl;
|
||||
@ -56,7 +57,7 @@ public class DefaultDataHandler implements DataHandler {
|
||||
|
||||
private Map<String, List<ColumnMeta>> columnMetas;
|
||||
|
||||
public DefaultDataHandler(Configuration configuration) {
|
||||
public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) {
|
||||
this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME);
|
||||
this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD);
|
||||
this.jdbcUrl = configuration.getString(Key.JDBC_URL);
|
||||
@ -64,6 +65,7 @@ public class DefaultDataHandler implements DataHandler {
|
||||
this.tables = configuration.getList(Key.TABLE, String.class);
|
||||
this.columns = configuration.getList(Key.COLUMN, String.class);
|
||||
this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED, Constants.DEFAULT_IGNORE_TAGS_UNMATCHED);
|
||||
this.taskPluginCollector = taskPluginCollector;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -86,14 +88,24 @@ public class DefaultDataHandler implements DataHandler {
|
||||
if (i % batchSize != 0) {
|
||||
recordBatch.add(record);
|
||||
} else {
|
||||
affectedRows = writeBatch(conn, recordBatch);
|
||||
try {
|
||||
affectedRows = writeBatch(conn, recordBatch);
|
||||
} catch (SQLException e) {
|
||||
LOG.warn("use one row insert. because:" + e.getMessage());
|
||||
affectedRows = writeEachRow(conn, recordBatch);
|
||||
}
|
||||
recordBatch.clear();
|
||||
}
|
||||
count++;
|
||||
}
|
||||
|
||||
if (!recordBatch.isEmpty()) {
|
||||
affectedRows = writeBatch(conn, recordBatch);
|
||||
try {
|
||||
affectedRows = writeBatch(conn, recordBatch);
|
||||
} catch (SQLException e) {
|
||||
LOG.warn("use one row insert. because:" + e.getMessage());
|
||||
affectedRows = writeEachRow(conn, recordBatch);
|
||||
}
|
||||
recordBatch.clear();
|
||||
}
|
||||
} catch (SQLException e) {
|
||||
@ -107,6 +119,21 @@ public class DefaultDataHandler implements DataHandler {
|
||||
return affectedRows;
|
||||
}
|
||||
|
||||
private int writeEachRow(Connection conn, List<Record> recordBatch) {
|
||||
int affectedRows = 0;
|
||||
for (Record record : recordBatch) {
|
||||
List<Record> recordList = new ArrayList<>();
|
||||
recordList.add(record);
|
||||
try {
|
||||
affectedRows += writeBatch(conn, recordList);
|
||||
} catch (SQLException e) {
|
||||
LOG.error(e.getMessage());
|
||||
this.taskPluginCollector.collectDirtyRecord(record, e);
|
||||
}
|
||||
}
|
||||
return affectedRows;
|
||||
}
|
||||
|
||||
/**
|
||||
* table: [ "stb1", "stb2", "tb1", "tb2", "t1" ]
|
||||
* stb1[ts,f1,f2] tags:[t1]
|
||||
@ -118,7 +145,7 @@ public class DefaultDataHandler implements DataHandler {
|
||||
* 3. 对于tb,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2)
|
||||
* 4. 对于t,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2)
|
||||
*/
|
||||
public int writeBatch(Connection conn, List<Record> recordBatch) {
|
||||
public int writeBatch(Connection conn, List<Record> recordBatch) throws SQLException {
|
||||
int affectedRows = 0;
|
||||
for (String table : tables) {
|
||||
TableMeta tableMeta = tableMetas.get(table);
|
||||
@ -146,7 +173,7 @@ public class DefaultDataHandler implements DataHandler {
|
||||
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
|
||||
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
|
||||
*/
|
||||
private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) {
|
||||
private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) throws SQLException {
|
||||
List<ColumnMeta> columnMetas = this.columnMetas.get(table);
|
||||
|
||||
StringBuilder sb = new StringBuilder("insert into");
|
||||
@ -177,13 +204,11 @@ public class DefaultDataHandler implements DataHandler {
|
||||
return executeUpdate(conn, sql);
|
||||
}
|
||||
|
||||
private int executeUpdate(Connection conn, String sql) throws DataXException {
|
||||
private int executeUpdate(Connection conn, String sql) throws SQLException {
|
||||
int count;
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
LOG.debug(">>> " + sql);
|
||||
count = stmt.executeUpdate(sql);
|
||||
} catch (SQLException e) {
|
||||
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
|
||||
}
|
||||
return count;
|
||||
}
|
||||
@ -227,7 +252,7 @@ public class DefaultDataHandler implements DataHandler {
|
||||
* table: ["stb1"], column: ["ts", "f1", "f2", "t1"]
|
||||
* data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts
|
||||
*/
|
||||
private int writeBatchToSupTableBySchemaless(Connection conn, String table, List<Record> recordBatch) {
|
||||
private int writeBatchToSupTableBySchemaless(Connection conn, String table, List<Record> recordBatch) throws SQLException {
|
||||
int count = 0;
|
||||
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
|
||||
|
||||
@ -296,11 +321,8 @@ public class DefaultDataHandler implements DataHandler {
|
||||
default:
|
||||
timestampType = SchemalessTimestampType.NOT_CONFIGURED;
|
||||
}
|
||||
try {
|
||||
writer.write(lines, SchemalessProtocolType.LINE, timestampType);
|
||||
} catch (SQLException e) {
|
||||
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
|
||||
}
|
||||
|
||||
writer.write(lines, SchemalessProtocolType.LINE, timestampType);
|
||||
|
||||
LOG.warn("schemalessWriter does not return affected rows!");
|
||||
return count;
|
||||
@ -370,7 +392,7 @@ public class DefaultDataHandler implements DataHandler {
|
||||
* else
|
||||
* insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
|
||||
*/
|
||||
private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) {
|
||||
private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
|
||||
List<ColumnMeta> columnMetas = this.columnMetas.get(table);
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
@ -440,7 +462,7 @@ public class DefaultDataHandler implements DataHandler {
|
||||
* table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"]
|
||||
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
|
||||
*/
|
||||
private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) {
|
||||
private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
|
||||
List<ColumnMeta> columnMetas = this.columnMetas.get(table);
|
||||
|
||||
StringBuilder sb = new StringBuilder();
|
||||
|
@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||
import com.alibaba.datax.common.plugin.TaskPluginCollector;
|
||||
import com.alibaba.datax.common.spi.Writer;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
@ -81,10 +82,12 @@ public class TDengineWriter extends Writer {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
|
||||
|
||||
private Configuration writerSliceConfig;
|
||||
private TaskPluginCollector taskPluginCollector;
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
this.writerSliceConfig = getPluginJobConf();
|
||||
this.taskPluginCollector = super.getTaskPluginCollector();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -101,7 +104,7 @@ public class TDengineWriter extends Writer {
|
||||
if (peerPluginName.equals("opentsdbreader"))
|
||||
handler = new OpentsdbDataHandler(this.writerSliceConfig);
|
||||
else
|
||||
handler = new DefaultDataHandler(this.writerSliceConfig);
|
||||
handler = new DefaultDataHandler(this.writerSliceConfig, this.taskPluginCollector);
|
||||
|
||||
long records = handler.handle(lineReceiver, getTaskPluginCollector());
|
||||
LOG.debug("handle data finished, records: " + records);
|
||||
|
@ -0,0 +1,41 @@
|
||||
package com.alibaba.datax.plugin.writer.tdenginewriter;
|
||||
|
||||
import com.alibaba.datax.core.Engine;
|
||||
import org.junit.Ignore;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
import java.sql.Statement;
|
||||
|
||||
@Ignore
|
||||
public class Csv2TDengineTest {
|
||||
|
||||
private static final String host = "192.168.56.105";
|
||||
|
||||
@Test
|
||||
public void case01() throws Throwable {
|
||||
// given
|
||||
prepareTable();
|
||||
|
||||
// when
|
||||
String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/csv2t.json"};
|
||||
System.setProperty("datax.home", "../target/datax/datax");
|
||||
Engine.entry(params);
|
||||
}
|
||||
|
||||
public void prepareTable() throws SQLException {
|
||||
final String url = "jdbc:TAOS-RS://" + host + ":6041";
|
||||
try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) {
|
||||
Statement stmt = conn.createStatement();
|
||||
|
||||
stmt.execute("drop database if exists test");
|
||||
stmt.execute("create database if not exists test");
|
||||
stmt.execute("create table test.weather (ts timestamp, temperature bigint, humidity double, is_normal bool) " +
|
||||
"tags(device_id binary(10),address nchar(10))");
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
}
|
@ -4,6 +4,8 @@ import com.alibaba.datax.common.element.DateColumn;
|
||||
import com.alibaba.datax.common.element.LongColumn;
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
import com.alibaba.datax.common.element.StringColumn;
|
||||
import com.alibaba.datax.common.plugin.TaskPluginCollector;
|
||||
import com.alibaba.datax.common.spi.Writer;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.core.transport.record.DefaultRecord;
|
||||
import org.junit.AfterClass;
|
||||
@ -25,6 +27,8 @@ public class DefaultDataHandlerTest {
|
||||
private static final String host = "192.168.1.93";
|
||||
private static Connection conn;
|
||||
|
||||
private final TaskPluginCollector taskPluginCollector = new TDengineWriter.Task().getTaskPluginCollector();
|
||||
|
||||
@Test
|
||||
public void writeSupTableBySQL() throws SQLException {
|
||||
// given
|
||||
@ -48,8 +52,9 @@ public class DefaultDataHandlerTest {
|
||||
return record;
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
|
||||
// when
|
||||
DefaultDataHandler handler = new DefaultDataHandler(configuration);
|
||||
DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
|
||||
List<String> tables = configuration.getList("table", String.class);
|
||||
SchemaManager schemaManager = new SchemaManager(conn);
|
||||
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
||||
@ -87,7 +92,7 @@ public class DefaultDataHandlerTest {
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
// when
|
||||
DefaultDataHandler handler = new DefaultDataHandler(configuration);
|
||||
DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
|
||||
List<String> tables = configuration.getList("table", String.class);
|
||||
SchemaManager schemaManager = new SchemaManager(conn);
|
||||
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
||||
@ -127,7 +132,7 @@ public class DefaultDataHandlerTest {
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
// when
|
||||
DefaultDataHandler handler = new DefaultDataHandler(configuration);
|
||||
DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
|
||||
List<String> tables = configuration.getList("table", String.class);
|
||||
SchemaManager schemaManager = new SchemaManager(connection);
|
||||
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
||||
@ -166,7 +171,7 @@ public class DefaultDataHandlerTest {
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
// when
|
||||
DefaultDataHandler handler = new DefaultDataHandler(configuration);
|
||||
DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
|
||||
List<String> tables = configuration.getList("table", String.class);
|
||||
SchemaManager schemaManager = new SchemaManager(conn);
|
||||
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
||||
@ -205,7 +210,7 @@ public class DefaultDataHandlerTest {
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
// when
|
||||
DefaultDataHandler handler = new DefaultDataHandler(configuration);
|
||||
DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
|
||||
List<String> tables = configuration.getList("table", String.class);
|
||||
SchemaManager schemaManager = new SchemaManager(conn);
|
||||
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
||||
@ -244,7 +249,7 @@ public class DefaultDataHandlerTest {
|
||||
}).collect(Collectors.toList());
|
||||
|
||||
// when
|
||||
DefaultDataHandler handler = new DefaultDataHandler(configuration);
|
||||
DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector);
|
||||
List<String> tables = configuration.getList("table", String.class);
|
||||
SchemaManager schemaManager = new SchemaManager(conn);
|
||||
Map<String, TableMeta> tableMetas = schemaManager.loadTableMeta(tables);
|
||||
@ -260,7 +265,7 @@ public class DefaultDataHandlerTest {
|
||||
}
|
||||
|
||||
private void createSupAndSubTable() throws SQLException {
|
||||
try(Statement stmt = conn.createStatement()){
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
stmt.execute("drop database if exists scm_test");
|
||||
stmt.execute("create database if not exists scm_test");
|
||||
stmt.execute("use scm_test");
|
||||
@ -275,7 +280,7 @@ public class DefaultDataHandlerTest {
|
||||
}
|
||||
|
||||
private void createSupTable() throws SQLException {
|
||||
try (Statement stmt = conn.createStatement()){
|
||||
try (Statement stmt = conn.createStatement()) {
|
||||
stmt.execute("drop database if exists scm_test");
|
||||
stmt.execute("create database if not exists scm_test");
|
||||
stmt.execute("use scm_test");
|
||||
|
80
tdenginewriter/src/test/resources/csv2t.json
Normal file
80
tdenginewriter/src/test/resources/csv2t.json
Normal file
@ -0,0 +1,80 @@
|
||||
{
|
||||
"job": {
|
||||
"content": [
|
||||
{
|
||||
"reader": {
|
||||
"name": "txtfilereader",
|
||||
"parameter": {
|
||||
"path": [
|
||||
"/Users/yangzy/IdeaProjects/DataX/tdenginewriter/src/test/resources/weather.csv"
|
||||
],
|
||||
"encoding": "UTF-8",
|
||||
"column": [
|
||||
{
|
||||
"index": 0,
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"index": 1,
|
||||
"type": "date",
|
||||
"format": "yyy-MM-dd HH:mm:ss.SSS"
|
||||
},
|
||||
{
|
||||
"index": 2,
|
||||
"type": "long"
|
||||
},
|
||||
{
|
||||
"index": 3,
|
||||
"type": "double"
|
||||
},
|
||||
{
|
||||
"index": 4,
|
||||
"type": "long"
|
||||
},
|
||||
{
|
||||
"index": 5,
|
||||
"type": "string"
|
||||
},
|
||||
{
|
||||
"index": 6,
|
||||
"type": "String"
|
||||
}
|
||||
],
|
||||
"fieldDelimiter": ","
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "tdenginewriter",
|
||||
"parameter": {
|
||||
"username": "root",
|
||||
"password": "taosdata",
|
||||
"column": [
|
||||
"tbname",
|
||||
"ts",
|
||||
"temperature",
|
||||
"humidity",
|
||||
"is_normal",
|
||||
"device_id",
|
||||
"address"
|
||||
],
|
||||
"connection": [
|
||||
{
|
||||
"table": [
|
||||
"weather"
|
||||
],
|
||||
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test"
|
||||
}
|
||||
],
|
||||
"batchSize": 100,
|
||||
"ignoreTagsUnmatched": true
|
||||
}
|
||||
}
|
||||
}
|
||||
],
|
||||
"setting": {
|
||||
"speed": {
|
||||
"channel": 1
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
10
tdenginewriter/src/test/resources/weather.csv
Normal file
10
tdenginewriter/src/test/resources/weather.csv
Normal file
@ -0,0 +1,10 @@
|
||||
tb1,2022-02-20 04:05:59.255,5,8.591868744,1,abcABC123,北京朝阳望京
|
||||
tb1,2022-02-20 04:58:47.068,3,1.489693641,1,abcABC123,北京朝阳望京
|
||||
tb1,2022-02-20 06:31:09.408,1,4.026500719,1,abcABC123,北京朝阳望京
|
||||
tb1,2022-02-20 08:08:00.336,1,9.606400360,1,abcABC123,北京朝阳望京
|
||||
tb1,2022-02-20 08:28:58.053,9,7.872178184,1,abcABC123123,北京朝阳望京
|
||||
tb1,2022-02-20 10:23:20.836,9,2.699478524,1,abcABC123,北京朝阳望京
|
||||
tb1,2022-02-20 11:09:59.739,7,7.906723716,1,abcABC123,北京朝阳望京
|
||||
tb1,2022-02-20 19:08:29.315,1,5.852338895,1,abcABC123,北京朝阳望京
|
||||
tb1,2022-02-20 22:10:06.243,10,5.535007901,1,abcABC123,北京朝阳望京
|
||||
tb1,2022-02-20 23:52:43.683,10,10.642013185,1,abcABC123,北京朝阳望京
|
|
Loading…
Reference in New Issue
Block a user