diff --git a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandler.java b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandler.java index ac8ea606..800cbbb0 100644 --- a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandler.java +++ b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/DefaultDataHandler.java @@ -183,7 +183,7 @@ public class DefaultDataHandler implements DataHandler { private int writeBatchToSupTableWithoutTbname(Connection conn, String table, List recordBatch, Map tag2Tbname) throws SQLException { - List columnMetas = schemaCache.getColumnMetaList(table); + List columnMetas = schemaCache.getColumnMetaList(table, TableType.SUP_TABLE); List subTableExist = filterSubTableExistRecords(recordBatch, columnMetas, tag2Tbname); List subTableNotExist = filterSubTableNotExistRecords(recordBatch, columnMetas, tag2Tbname); @@ -263,7 +263,7 @@ public class DefaultDataHandler implements DataHandler { * record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], ) */ private int writeBatchToSupTableBySQL(Connection conn, String table, List recordBatch) throws SQLException { - List columnMetas = this.schemaCache.getColumnMetaList(table); + List columnMetas = this.schemaCache.getColumnMetaList(table, TableType.SUP_TABLE); StringBuilder sb = new StringBuilder("insert into"); for (Record record : recordBatch) { @@ -356,7 +356,7 @@ public class DefaultDataHandler implements DataHandler { int count = 0; TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision(); - List columnMetaList = this.schemaCache.getColumnMetaList(table); + List columnMetaList = this.schemaCache.getColumnMetaList(table, TableType.SUP_TABLE); ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get(); List lines = new ArrayList<>(); @@ -494,7 +494,7 @@ public class DefaultDataHandler implements DataHandler { * insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)]) */ private int writeBatchToSubTable(Connection conn, String table, List recordBatch) throws SQLException { - List columnMetas = this.schemaCache.getColumnMetaList(table); + List columnMetas = this.schemaCache.getColumnMetaList(table, TableType.SUB_TABLE); StringBuilder sb = new StringBuilder(); sb.append("insert into `") @@ -569,7 +569,7 @@ public class DefaultDataHandler implements DataHandler { * sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...) */ private int writeBatchToNormalTable(Connection conn, String table, List recordBatch) throws SQLException { - List columnMetas = this.schemaCache.getColumnMetaList(table); + List columnMetas = this.schemaCache.getColumnMetaList(table, TableType.NML_TABLE); StringBuilder sb = new StringBuilder(); sb.append("insert into `") diff --git a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java index 3bf46373..89efba31 100644 --- a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java +++ b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCache.java @@ -73,19 +73,6 @@ public final class SchemaCache { } public TableMeta getTableMeta(String table_name) { - //if (tableMetas.get(table_name) == null) { - // synchronized (SchemaCache.class) { - // if (tableMetas.get(table_name) == null) { - // SchemaManager schemaManager = new Schema3_0Manager(SchemaCache.conn, dbname); - // - // List tables = new ArrayList<>(); - // tables.add(table_name); - // Map metas = schemaManager.loadTableMeta(tables); - // - // tableMetas.putAll(metas); - // } - // } - //} if (!tableMetas.containsKey(table_name)) { throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "table metadata of " + table_name + " is empty!"); @@ -94,11 +81,11 @@ public final class SchemaCache { return tableMetas.get(table_name); } - public List getColumnMetaList(String tbname) { + public List getColumnMetaList(String tbname, TableType tableType) { if (columnMetas.get(tbname).isEmpty()) { synchronized (SchemaCache.class) { if (columnMetas.get(tbname).isEmpty()) { - List colMetaList = getColumnMetaListFromDb(tbname); + List colMetaList = getColumnMetaListFromDb(tbname, tableType); if (colMetaList.isEmpty()) { throw DataXException.asDataXException("column metadata of table: " + tbname + " is empty!"); } @@ -110,7 +97,7 @@ public final class SchemaCache { return columnMetas.get(tbname); } - private List getColumnMetaListFromDb(String tableName) { + private List getColumnMetaListFromDb(String tableName, TableType tableType) { List columnMetaList = new ArrayList<>(); List column_name = config.getList(Key.COLUMN, String.class) @@ -131,18 +118,21 @@ public final class SchemaCache { throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage()); } - for (ColumnMeta colMeta : columnMetaList) { - if (!colMeta.isTag) - continue; - Object tagValue = getTagValue(tableName, colMeta.field); - colMeta.value = tagValue; + // 如果是子表,才需要获取 tag 值 + if (tableType == TableType.SUB_TABLE) { + for (ColumnMeta colMeta : columnMetaList) { + if (!colMeta.isTag) + continue; + Object tagValue = getTagValue(tableName, colMeta.field); + colMeta.value = tagValue; + } } return columnMetaList; } private Object getTagValue(String tableName, String tagName) { - String sql = "select " + tagName + " from " + tableName; + String sql = "select " + tagName + " from " + tableName + " limit 1"; Object tagValue = null; try (Statement stmt = conn.createStatement()) { ResultSet rs = stmt.executeQuery(sql); diff --git a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaManager.java b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaManager.java index 5fa6ef1f..347f417b 100644 --- a/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaManager.java +++ b/tdengine30writer/src/main/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaManager.java @@ -11,7 +11,7 @@ import java.util.stream.Collectors; public class SchemaManager { private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class); -// private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "_"; + // private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "_"; protected static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = ""; protected final Connection conn; @@ -82,7 +82,8 @@ public class SchemaManager { for (String tbname : tables) { if (!tableMetas.containsKey(tbname)) { - throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "table metadata of " + tbname + " is empty!"); + throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, + "table metadata of " + tbname + " is empty!"); } } } catch (SQLException e) { @@ -91,6 +92,7 @@ public class SchemaManager { return tableMetas; } + @Deprecated public Map> loadColumnMetas(List tables) throws DataXException { Map> ret = new HashMap<>(); diff --git a/tdengine30writer/src/test/java/com/alibaba/datax/plugin/writer/tdengine30writer/Csv2TDengineTest.java b/tdengine30writer/src/test/java/com/alibaba/datax/plugin/writer/tdengine30writer/Csv2TDengineTest.java index ea54689a..88dad7d5 100644 --- a/tdengine30writer/src/test/java/com/alibaba/datax/plugin/writer/tdengine30writer/Csv2TDengineTest.java +++ b/tdengine30writer/src/test/java/com/alibaba/datax/plugin/writer/tdengine30writer/Csv2TDengineTest.java @@ -12,7 +12,7 @@ import java.sql.Statement; @Ignore public class Csv2TDengineTest { - private static final String host = "192.168.56.105"; + private static final String host = "192.168.0.201"; @Test public void case01() throws Throwable { diff --git a/tdengine30writer/src/test/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCacheTest.java b/tdengine30writer/src/test/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCacheTest.java new file mode 100644 index 00000000..ddea37f2 --- /dev/null +++ b/tdengine30writer/src/test/java/com/alibaba/datax/plugin/writer/tdengine30writer/SchemaCacheTest.java @@ -0,0 +1,56 @@ +package com.alibaba.datax.plugin.writer.tdengine30writer; + +import com.alibaba.datax.common.util.Configuration; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Ignore; +import org.junit.Test; + +import java.io.InputStream; +import java.util.List; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +public class SchemaCacheTest { + + private String config; + + @Test + @Ignore + public void testSchemaCache() { + List tList = IntStream.range(0, 10).mapToObj(i -> { + Thread t = new Thread(() -> { + Configuration config = Configuration.from(this.config); + SchemaCache schemaCache = SchemaCache.getInstance(config); + + List col_metas = schemaCache.getColumnMetaList("cnpp_ads_wmct_d", TableType.SUP_TABLE); + Assert.assertEquals(10, col_metas.size()); + + }); + return t; + }).collect(Collectors.toList()); + + tList.forEach(Thread::start); + + tList.forEach(t -> { + try { + t.join(); + } catch (InterruptedException e) { + e.printStackTrace(); + } + }); + } + + @Before + public void before() { + InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream("ts-4558.json"); + try { + byte[] bytes = new byte[in.available()]; + in.read(bytes); + this.config = new String(bytes); + } catch (Exception e) { + e.printStackTrace(); + } + } + +} \ No newline at end of file diff --git a/tdengine30writer/src/test/resources/csv2t.json b/tdengine30writer/src/test/resources/csv2t.json index 37fa3c79..c81c88bd 100644 --- a/tdengine30writer/src/test/resources/csv2t.json +++ b/tdengine30writer/src/test/resources/csv2t.json @@ -62,7 +62,7 @@ "table": [ "weather" ], - "jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test" + "jdbcUrl": "jdbc:TAOS-RS://192.168.0.201:6041/test" } ], "batchSize": 100, diff --git a/tdengine30writer/src/test/resources/ts-4558.json b/tdengine30writer/src/test/resources/ts-4558.json new file mode 100644 index 00000000..28863007 --- /dev/null +++ b/tdengine30writer/src/test/resources/ts-4558.json @@ -0,0 +1,96 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "column": [ + { + "type": "string", + "value": "tb20240425" + }, + { + "type": "date", + "value": "2024-04-19 01:02:03.456", + "dateFormat": "yyyy-MM-dd HH:mm:ss.SSS" + }, + { + "type": "string", + "value": "device_name:abc" + }, + { + "type": "string", + "value": "device_cod:123" + }, + { + "type": "string", + "value": "station_id:CN" + }, + { + "type": "string", + "value": "station_name:china" + }, + { + "type": "string", + "value": "project_id:1" + }, + { + "type": "string", + "value": "project_name:cnpp" + }, + { + "type": "double", + "value": 1.11 + }, + { + "type": "double", + "value": 2.22 + }, + { + "type": "double", + "value": 3.33 + } + ], + "sliceRecordCount": 10 + } + }, + "writer": { + "name": "tdengine30writer", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "tbname", + "event_time", + "device_name", + "device_code", + "station_id", + "station_name", + "project_id", + "project_name", + "twoutfan1", + "twoutfan2", + "twoutfan3" + ], + "connection": [ + { + "table": [ + "cnpp_ads_wmct_d" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.0.201:6041/zyyang?user=root&password=taosdata" + } + ], + "ignoreTagsUnmatched": true, + "batchSize": "8182" + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file diff --git a/tdengine30writer/src/test/resources/ts-4558.sql b/tdengine30writer/src/test/resources/ts-4558.sql new file mode 100644 index 00000000..0d57bc4c --- /dev/null +++ b/tdengine30writer/src/test/resources/ts-4558.sql @@ -0,0 +1,2 @@ +create stable sinktest.cnpp_ads_wmct_d (`event_time` timestamp,`twoutfan1` float,`twoutfan2` float,`twoutfan3` float) +TAGS (`device_name` NCHAR(30),`device_code` NCHAR(30) ,`station_id` NCHAR(30),`station_name` NCHAR(30),`project_id` NCHAR(30),`project_name` NCHAR(30)) \ No newline at end of file diff --git a/tdengine30writer/src/test/resources/ts-4588-2.json b/tdengine30writer/src/test/resources/ts-4588-2.json new file mode 100644 index 00000000..28863007 --- /dev/null +++ b/tdengine30writer/src/test/resources/ts-4588-2.json @@ -0,0 +1,96 @@ +{ + "job": { + "content": [ + { + "reader": { + "name": "streamreader", + "parameter": { + "column": [ + { + "type": "string", + "value": "tb20240425" + }, + { + "type": "date", + "value": "2024-04-19 01:02:03.456", + "dateFormat": "yyyy-MM-dd HH:mm:ss.SSS" + }, + { + "type": "string", + "value": "device_name:abc" + }, + { + "type": "string", + "value": "device_cod:123" + }, + { + "type": "string", + "value": "station_id:CN" + }, + { + "type": "string", + "value": "station_name:china" + }, + { + "type": "string", + "value": "project_id:1" + }, + { + "type": "string", + "value": "project_name:cnpp" + }, + { + "type": "double", + "value": 1.11 + }, + { + "type": "double", + "value": 2.22 + }, + { + "type": "double", + "value": 3.33 + } + ], + "sliceRecordCount": 10 + } + }, + "writer": { + "name": "tdengine30writer", + "parameter": { + "username": "root", + "password": "taosdata", + "column": [ + "tbname", + "event_time", + "device_name", + "device_code", + "station_id", + "station_name", + "project_id", + "project_name", + "twoutfan1", + "twoutfan2", + "twoutfan3" + ], + "connection": [ + { + "table": [ + "cnpp_ads_wmct_d" + ], + "jdbcUrl": "jdbc:TAOS-RS://192.168.0.201:6041/zyyang?user=root&password=taosdata" + } + ], + "ignoreTagsUnmatched": true, + "batchSize": "8182" + } + } + } + ], + "setting": { + "speed": { + "channel": 1 + } + } + } +} \ No newline at end of file