diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java index a8196153..7854afee 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandler.java @@ -30,6 +30,7 @@ public class DefaultDataHandler implements DataHandler { private static final Logger LOG = LoggerFactory.getLogger(DefaultDataHandler.class); + private final TaskPluginCollector taskPluginCollector; private String username; private String password; private String jdbcUrl; @@ -56,7 +57,7 @@ public class DefaultDataHandler implements DataHandler { private Map> columnMetas; - public DefaultDataHandler(Configuration configuration) { + public DefaultDataHandler(Configuration configuration, TaskPluginCollector taskPluginCollector) { this.username = configuration.getString(Key.USERNAME, Constants.DEFAULT_USERNAME); this.password = configuration.getString(Key.PASSWORD, Constants.DEFAULT_PASSWORD); this.jdbcUrl = configuration.getString(Key.JDBC_URL); @@ -64,6 +65,7 @@ public class DefaultDataHandler implements DataHandler { this.tables = configuration.getList(Key.TABLE, String.class); this.columns = configuration.getList(Key.COLUMN, String.class); this.ignoreTagsUnmatched = configuration.getBool(Key.IGNORE_TAGS_UNMATCHED, Constants.DEFAULT_IGNORE_TAGS_UNMATCHED); + this.taskPluginCollector = taskPluginCollector; } @Override @@ -86,14 +88,24 @@ public class DefaultDataHandler implements DataHandler { if (i % batchSize != 0) { recordBatch.add(record); } else { - affectedRows = writeBatch(conn, recordBatch); + try { + affectedRows = writeBatch(conn, recordBatch); + } catch (SQLException e) { + LOG.warn("use one row insert. because:" + e.getMessage()); + affectedRows = writeEachRow(conn, recordBatch); + } recordBatch.clear(); } count++; } if (!recordBatch.isEmpty()) { - affectedRows = writeBatch(conn, recordBatch); + try { + affectedRows = writeBatch(conn, recordBatch); + } catch (SQLException e) { + LOG.warn("use one row insert. because:" + e.getMessage()); + affectedRows = writeEachRow(conn, recordBatch); + } recordBatch.clear(); } } catch (SQLException e) { @@ -107,6 +119,21 @@ public class DefaultDataHandler implements DataHandler { return affectedRows; } + private int writeEachRow(Connection conn, List recordBatch) { + int affectedRows = 0; + for (Record record : recordBatch) { + List recordList = new ArrayList<>(); + recordList.add(record); + try { + affectedRows += writeBatch(conn, recordList); + } catch (SQLException e) { + LOG.error(e.getMessage()); + this.taskPluginCollector.collectDirtyRecord(record, e); + } + } + return affectedRows; + } + /** * table: [ "stb1", "stb2", "tb1", "tb2", "t1" ] * stb1[ts,f1,f2] tags:[t1] @@ -118,7 +145,7 @@ public class DefaultDataHandler implements DataHandler { * 3. 对于tb,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => insert into tb(ts, f1, f2) values(ts, f1, f2) * 4. 对于t,拼sql,例如:data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, f3, t1, t2] insert into t(ts, f1, f2, f3, t1, t2) values(ts, f1, f2, f3, t1, t2) */ - public int writeBatch(Connection conn, List recordBatch) { + public int writeBatch(Connection conn, List recordBatch) throws SQLException { int affectedRows = 0; for (String table : tables) { TableMeta tableMeta = tableMetas.get(table); @@ -146,7 +173,7 @@ public class DefaultDataHandler implements DataHandler { * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) */ - private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) { + private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) throws SQLException { List columnMetas = this.columnMetas.get(table); StringBuilder sb = new StringBuilder("insert into"); @@ -177,13 +204,11 @@ public class DefaultDataHandler implements DataHandler { return executeUpdate(conn, sql); } - private int executeUpdate(Connection conn, String sql) throws DataXException { + private int executeUpdate(Connection conn, String sql) throws SQLException { int count; try (Statement stmt = conn.createStatement()) { LOG.debug(">>> " + sql); count = stmt.executeUpdate(sql); - } catch (SQLException e) { - throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage()); } return count; } @@ -227,7 +252,7 @@ public class DefaultDataHandler implements DataHandler { * table: ["stb1"], column: ["ts", "f1", "f2", "t1"] * data: [ts, f1, f2, f3, t1, t2] tbColumn: [ts, f1, f2, t1] => schemaless: stb1,t1=t1 f1=f1,f2=f2 ts */ - private int writeBatchToSupTableBySchemaless(Connection conn, String table, List recordBatch) { + private int writeBatchToSupTableBySchemaless(Connection conn, String table, List recordBatch) throws SQLException { int count = 0; TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision(); @@ -296,11 +321,8 @@ public class DefaultDataHandler implements DataHandler { default: timestampType = SchemalessTimestampType.NOT_CONFIGURED; } - try { - writer.write(lines, SchemalessProtocolType.LINE, timestampType); - } catch (SQLException e) { - throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage()); - } + + writer.write(lines, SchemalessProtocolType.LINE, timestampType); LOG.warn("schemalessWriter does not return affected rows!"); return count; @@ -370,7 +392,7 @@ public class DefaultDataHandler implements DataHandler { * else * insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)]) */ - private int writeBatchToSubTable(Connection conn, String table, List recordBatch) { + private int writeBatchToSubTable(Connection conn, String table, List recordBatch) throws SQLException { List columnMetas = this.columnMetas.get(table); StringBuilder sb = new StringBuilder(); @@ -440,7 +462,7 @@ public class DefaultDataHandler implements DataHandler { * table: ["weather"], column: ["ts, f1, f2, f3, t1, t2"] * sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...) */ - private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) { + private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) throws SQLException { List columnMetas = this.columnMetas.get(table); StringBuilder sb = new StringBuilder(); diff --git a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java index eb538022..73982744 100644 --- a/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java +++ b/tdenginewriter/src/main/java/com/alibaba/datax/plugin/writer/tdenginewriter/TDengineWriter.java @@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.tdenginewriter; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; import org.apache.commons.lang3.StringUtils; @@ -81,10 +82,12 @@ public class TDengineWriter extends Writer { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private Configuration writerSliceConfig; + private TaskPluginCollector taskPluginCollector; @Override public void init() { this.writerSliceConfig = getPluginJobConf(); + this.taskPluginCollector = super.getTaskPluginCollector(); } @Override @@ -101,7 +104,7 @@ public class TDengineWriter extends Writer { if (peerPluginName.equals("opentsdbreader")) handler = new OpentsdbDataHandler(this.writerSliceConfig); else - handler = new DefaultDataHandler(this.writerSliceConfig); + handler = new DefaultDataHandler(this.writerSliceConfig, this.taskPluginCollector); long records = handler.handle(lineReceiver, getTaskPluginCollector()); LOG.debug("handle data finished, records: " + records); diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Csv2TDengineTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Csv2TDengineTest.java new file mode 100644 index 00000000..7352c3ca --- /dev/null +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/Csv2TDengineTest.java @@ -0,0 +1,41 @@ +package com.alibaba.datax.plugin.writer.tdenginewriter; + +import com.alibaba.datax.core.Engine; +import org.junit.Ignore; +import org.junit.Test; + +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.sql.Statement; + +@Ignore +public class Csv2TDengineTest { + + private static final String host = "192.168.56.105"; + + @Test + public void case01() throws Throwable { + // given + prepareTable(); + + // when + String[] params = {"-mode", "standalone", "-jobid", "-1", "-job", "src/test/resources/csv2t.json"}; + System.setProperty("datax.home", "../target/datax/datax"); + Engine.entry(params); + } + + public void prepareTable() throws SQLException { + final String url = "jdbc:TAOS-RS://" + host + ":6041"; + try (Connection conn = DriverManager.getConnection(url, "root", "taosdata")) { + Statement stmt = conn.createStatement(); + + stmt.execute("drop database if exists test"); + stmt.execute("create database if not exists test"); + stmt.execute("create table test.weather (ts timestamp, temperature bigint, humidity double, is_normal bool) " + + "tags(device_id binary(10),address nchar(10))"); + } + } + + +} diff --git a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java index 3657a4f7..8840aa28 100644 --- a/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java +++ b/tdenginewriter/src/test/java/com/alibaba/datax/plugin/writer/tdenginewriter/DefaultDataHandlerTest.java @@ -4,6 +4,8 @@ import com.alibaba.datax.common.element.DateColumn; import com.alibaba.datax.common.element.LongColumn; import com.alibaba.datax.common.element.Record; import com.alibaba.datax.common.element.StringColumn; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.core.transport.record.DefaultRecord; import org.junit.AfterClass; @@ -25,6 +27,8 @@ public class DefaultDataHandlerTest { private static final String host = "192.168.1.93"; private static Connection conn; + private final TaskPluginCollector taskPluginCollector = new TDengineWriter.Task().getTaskPluginCollector(); + @Test public void writeSupTableBySQL() throws SQLException { // given @@ -48,8 +52,9 @@ public class DefaultDataHandlerTest { return record; }).collect(Collectors.toList()); + // when - DefaultDataHandler handler = new DefaultDataHandler(configuration); + DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector); List tables = configuration.getList("table", String.class); SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); @@ -87,7 +92,7 @@ public class DefaultDataHandlerTest { }).collect(Collectors.toList()); // when - DefaultDataHandler handler = new DefaultDataHandler(configuration); + DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector); List tables = configuration.getList("table", String.class); SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); @@ -127,7 +132,7 @@ public class DefaultDataHandlerTest { }).collect(Collectors.toList()); // when - DefaultDataHandler handler = new DefaultDataHandler(configuration); + DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector); List tables = configuration.getList("table", String.class); SchemaManager schemaManager = new SchemaManager(connection); Map tableMetas = schemaManager.loadTableMeta(tables); @@ -166,7 +171,7 @@ public class DefaultDataHandlerTest { }).collect(Collectors.toList()); // when - DefaultDataHandler handler = new DefaultDataHandler(configuration); + DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector); List tables = configuration.getList("table", String.class); SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); @@ -205,7 +210,7 @@ public class DefaultDataHandlerTest { }).collect(Collectors.toList()); // when - DefaultDataHandler handler = new DefaultDataHandler(configuration); + DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector); List tables = configuration.getList("table", String.class); SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); @@ -244,7 +249,7 @@ public class DefaultDataHandlerTest { }).collect(Collectors.toList()); // when - DefaultDataHandler handler = new DefaultDataHandler(configuration); + DefaultDataHandler handler = new DefaultDataHandler(configuration, taskPluginCollector); List tables = configuration.getList("table", String.class); SchemaManager schemaManager = new SchemaManager(conn); Map tableMetas = schemaManager.loadTableMeta(tables); @@ -260,7 +265,7 @@ public class DefaultDataHandlerTest { } private void createSupAndSubTable() throws SQLException { - try(Statement stmt = conn.createStatement()){ + try (Statement stmt = conn.createStatement()) { stmt.execute("drop database if exists scm_test"); stmt.execute("create database if not exists scm_test"); stmt.execute("use scm_test"); @@ -275,7 +280,7 @@ public class DefaultDataHandlerTest { } private void createSupTable() throws SQLException { - try (Statement stmt = conn.createStatement()){ + try (Statement stmt = conn.createStatement()) { stmt.execute("drop database if exists scm_test"); stmt.execute("create database if not exists scm_test"); stmt.execute("use scm_test"); diff --git a/tdenginewriter/src/test/resources/csv2t.json b/tdenginewriter/src/test/resources/csv2t.json new file mode 100644 index 00000000..ef5c4d04 --- /dev/null +++ b/tdenginewriter/src/test/resources/csv2t.json @@ -0,0 +1,80 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "txtfilereader", + "parameter": { + "path": [ + "/Users/yangzy/IdeaProjects/DataX/tdenginewriter/src/test/resources/weather.csv" + ], + "encoding": "UTF-8", + "column": [ + { + "index": 0, + "type": "string" + }, + { + "index": 1, + "type": "date", + "format": "yyy-MM-dd HH:mm:ss.SSS" + }, + { + "index": 2, + "type": "long" + }, + { + "index": 3, + "type": "double" + }, + { + "index": 4, + "type": "long" + }, + { + "index": 5, + "type": "string" + }, + { + "index": 6, + "type": "String" + } + ], + "fieldDelimiter": "," + } + }, + "writer": { + "name": "tdenginewriter", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "tbname", + "ts", + "temperature", + "humidity", + "is_normal", + "device_id", + "address" + ], + "connection": [ + { + "table": [ + "weather" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test" + } + ], + "batchSize": 100, + "ignoreTagsUnmatched": true + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdenginewriter/src/test/resources/weather.csv b/tdenginewriter/src/test/resources/weather.csv new file mode 100644 index 00000000..21c4a1aa --- /dev/null +++ b/tdenginewriter/src/test/resources/weather.csv @@ -0,0 +1,10 @@ +tb1,2022-02-20 04:05:59.255,5,8.591868744,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 04:58:47.068,3,1.489693641,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 06:31:09.408,1,4.026500719,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 08:08:00.336,1,9.606400360,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 08:28:58.053,9,7.872178184,1,abcABC123123,北京朝阳望京 +tb1,2022-02-20 10:23:20.836,9,2.699478524,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 11:09:59.739,7,7.906723716,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 19:08:29.315,1,5.852338895,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 22:10:06.243,10,5.535007901,1,abcABC123,北京朝阳望京 +tb1,2022-02-20 23:52:43.683,10,10.642013185,1,abcABC123,北京朝阳望京