fix: super table donot need get tag value#ts-4558

This commit is contained in:
zyyang 2024-05-28 11:38:30 +08:00
parent 67333d8bfa
commit e820e0ee8a
9 changed files with 273 additions and 31 deletions

View File

@ -183,7 +183,7 @@ public class DefaultDataHandler implements DataHandler {
private int writeBatchToSupTableWithoutTbname(Connection conn, String table, List<Record> recordBatch,
Map<String, String> tag2Tbname) throws SQLException {
List<ColumnMeta> columnMetas = schemaCache.getColumnMetaList(table);
List<ColumnMeta> columnMetas = schemaCache.getColumnMetaList(table, TableType.SUP_TABLE);
List<Record> subTableExist = filterSubTableExistRecords(recordBatch, columnMetas, tag2Tbname);
List<Record> subTableNotExist = filterSubTableNotExistRecords(recordBatch, columnMetas, tag2Tbname);
@ -263,7 +263,7 @@ public class DefaultDataHandler implements DataHandler {
* record[idx(tbname)] using table tags(record[idx(t1)]) (ts, f1, f2, f3) values(record[idx(ts)], record[idx(f1)], )
*/
private int writeBatchToSupTableBySQL(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.schemaCache.getColumnMetaList(table);
List<ColumnMeta> columnMetas = this.schemaCache.getColumnMetaList(table, TableType.SUP_TABLE);
StringBuilder sb = new StringBuilder("insert into");
for (Record record : recordBatch) {
@ -356,7 +356,7 @@ public class DefaultDataHandler implements DataHandler {
int count = 0;
TimestampPrecision timestampPrecision = schemaManager.loadDatabasePrecision();
List<ColumnMeta> columnMetaList = this.schemaCache.getColumnMetaList(table);
List<ColumnMeta> columnMetaList = this.schemaCache.getColumnMetaList(table, TableType.SUP_TABLE);
ColumnMeta ts = columnMetaList.stream().filter(colMeta -> colMeta.isPrimaryKey).findFirst().get();
List<String> lines = new ArrayList<>();
@ -494,7 +494,7 @@ public class DefaultDataHandler implements DataHandler {
* insert into tb1 (ts, f1, f2) values( record[idx(ts)], record[idx(f1)], record[idx(f2)])
*/
private int writeBatchToSubTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.schemaCache.getColumnMetaList(table);
List<ColumnMeta> columnMetas = this.schemaCache.getColumnMetaList(table, TableType.SUB_TABLE);
StringBuilder sb = new StringBuilder();
sb.append("insert into `")
@ -569,7 +569,7 @@ public class DefaultDataHandler implements DataHandler {
* sql: insert into weather (ts, f1, f2, f3, t1, t2) values( record[idx(ts), record[idx(f1)], ...)
*/
private int writeBatchToNormalTable(Connection conn, String table, List<Record> recordBatch) throws SQLException {
List<ColumnMeta> columnMetas = this.schemaCache.getColumnMetaList(table);
List<ColumnMeta> columnMetas = this.schemaCache.getColumnMetaList(table, TableType.NML_TABLE);
StringBuilder sb = new StringBuilder();
sb.append("insert into `")

View File

@ -73,19 +73,6 @@ public final class SchemaCache {
}
public TableMeta getTableMeta(String table_name) {
//if (tableMetas.get(table_name) == null) {
// synchronized (SchemaCache.class) {
// if (tableMetas.get(table_name) == null) {
// SchemaManager schemaManager = new Schema3_0Manager(SchemaCache.conn, dbname);
//
// List<String> tables = new ArrayList<>();
// tables.add(table_name);
// Map<String, TableMeta> metas = schemaManager.loadTableMeta(tables);
//
// tableMetas.putAll(metas);
// }
// }
//}
if (!tableMetas.containsKey(table_name)) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
"table metadata of " + table_name + " is empty!");
@ -94,11 +81,11 @@ public final class SchemaCache {
return tableMetas.get(table_name);
}
public List<ColumnMeta> getColumnMetaList(String tbname) {
public List<ColumnMeta> getColumnMetaList(String tbname, TableType tableType) {
if (columnMetas.get(tbname).isEmpty()) {
synchronized (SchemaCache.class) {
if (columnMetas.get(tbname).isEmpty()) {
List<ColumnMeta> colMetaList = getColumnMetaListFromDb(tbname);
List<ColumnMeta> colMetaList = getColumnMetaListFromDb(tbname, tableType);
if (colMetaList.isEmpty()) {
throw DataXException.asDataXException("column metadata of table: " + tbname + " is empty!");
}
@ -110,7 +97,7 @@ public final class SchemaCache {
return columnMetas.get(tbname);
}
private List<ColumnMeta> getColumnMetaListFromDb(String tableName) {
private List<ColumnMeta> getColumnMetaListFromDb(String tableName, TableType tableType) {
List<ColumnMeta> columnMetaList = new ArrayList<>();
List<String> column_name = config.getList(Key.COLUMN, String.class)
@ -131,18 +118,21 @@ public final class SchemaCache {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, e.getMessage());
}
for (ColumnMeta colMeta : columnMetaList) {
if (!colMeta.isTag)
continue;
Object tagValue = getTagValue(tableName, colMeta.field);
colMeta.value = tagValue;
// 如果是子表才需要获取 tag
if (tableType == TableType.SUB_TABLE) {
for (ColumnMeta colMeta : columnMetaList) {
if (!colMeta.isTag)
continue;
Object tagValue = getTagValue(tableName, colMeta.field);
colMeta.value = tagValue;
}
}
return columnMetaList;
}
private Object getTagValue(String tableName, String tagName) {
String sql = "select " + tagName + " from " + tableName;
String sql = "select " + tagName + " from " + tableName + " limit 1";
Object tagValue = null;
try (Statement stmt = conn.createStatement()) {
ResultSet rs = stmt.executeQuery(sql);

View File

@ -11,7 +11,7 @@ import java.util.stream.Collectors;
public class SchemaManager {
private static final Logger LOG = LoggerFactory.getLogger(SchemaManager.class);
// private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "_";
// private static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "_";
protected static final String TAG_TABLE_NAME_MAP_KEY_SPLITTER = "";
protected final Connection conn;
@ -82,7 +82,8 @@ public class SchemaManager {
for (String tbname : tables) {
if (!tableMetas.containsKey(tbname)) {
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION, "table metadata of " + tbname + " is empty!");
throw DataXException.asDataXException(TDengineWriterErrorCode.RUNTIME_EXCEPTION,
"table metadata of " + tbname + " is empty!");
}
}
} catch (SQLException e) {
@ -91,6 +92,7 @@ public class SchemaManager {
return tableMetas;
}
@Deprecated
public Map<String, List<ColumnMeta>> loadColumnMetas(List<String> tables) throws DataXException {
Map<String, List<ColumnMeta>> ret = new HashMap<>();

View File

@ -12,7 +12,7 @@ import java.sql.Statement;
@Ignore
public class Csv2TDengineTest {
private static final String host = "192.168.56.105";
private static final String host = "192.168.0.201";
@Test
public void case01() throws Throwable {

View File

@ -0,0 +1,56 @@
package com.alibaba.datax.plugin.writer.tdengine30writer;
import com.alibaba.datax.common.util.Configuration;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
import java.io.InputStream;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
public class SchemaCacheTest {
private String config;
@Test
@Ignore
public void testSchemaCache() {
List<Thread> tList = IntStream.range(0, 10).mapToObj(i -> {
Thread t = new Thread(() -> {
Configuration config = Configuration.from(this.config);
SchemaCache schemaCache = SchemaCache.getInstance(config);
List<ColumnMeta> col_metas = schemaCache.getColumnMetaList("cnpp_ads_wmct_d", TableType.SUP_TABLE);
Assert.assertEquals(10, col_metas.size());
});
return t;
}).collect(Collectors.toList());
tList.forEach(Thread::start);
tList.forEach(t -> {
try {
t.join();
} catch (InterruptedException e) {
e.printStackTrace();
}
});
}
@Before
public void before() {
InputStream in = Thread.currentThread().getContextClassLoader().getResourceAsStream("ts-4558.json");
try {
byte[] bytes = new byte[in.available()];
in.read(bytes);
this.config = new String(bytes);
} catch (Exception e) {
e.printStackTrace();
}
}
}

View File

@ -62,7 +62,7 @@
"table": [
"weather"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.56.105:6041/test"
"jdbcUrl": "jdbc:TAOS-RS://192.168.0.201:6041/test"
}
],
"batchSize": 100,

View File

@ -0,0 +1,96 @@
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"type": "string",
"value": "tb20240425"
},
{
"type": "date",
"value": "2024-04-19 01:02:03.456",
"dateFormat": "yyyy-MM-dd HH:mm:ss.SSS"
},
{
"type": "string",
"value": "device_name:abc"
},
{
"type": "string",
"value": "device_cod:123"
},
{
"type": "string",
"value": "station_id:CN"
},
{
"type": "string",
"value": "station_name:china"
},
{
"type": "string",
"value": "project_id:1"
},
{
"type": "string",
"value": "project_name:cnpp"
},
{
"type": "double",
"value": 1.11
},
{
"type": "double",
"value": 2.22
},
{
"type": "double",
"value": 3.33
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "tdengine30writer",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"event_time",
"device_name",
"device_code",
"station_id",
"station_name",
"project_id",
"project_name",
"twoutfan1",
"twoutfan2",
"twoutfan3"
],
"connection": [
{
"table": [
"cnpp_ads_wmct_d"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.0.201:6041/zyyang?user=root&password=taosdata"
}
],
"ignoreTagsUnmatched": true,
"batchSize": "8182"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}

View File

@ -0,0 +1,2 @@
create stable sinktest.cnpp_ads_wmct_d (`event_time` timestamp,`twoutfan1` float,`twoutfan2` float,`twoutfan3` float)
TAGS (`device_name` NCHAR(30),`device_code` NCHAR(30) ,`station_id` NCHAR(30),`station_name` NCHAR(30),`project_id` NCHAR(30),`project_name` NCHAR(30))

View File

@ -0,0 +1,96 @@
{
"job": {
"content": [
{
"reader": {
"name": "streamreader",
"parameter": {
"column": [
{
"type": "string",
"value": "tb20240425"
},
{
"type": "date",
"value": "2024-04-19 01:02:03.456",
"dateFormat": "yyyy-MM-dd HH:mm:ss.SSS"
},
{
"type": "string",
"value": "device_name:abc"
},
{
"type": "string",
"value": "device_cod:123"
},
{
"type": "string",
"value": "station_id:CN"
},
{
"type": "string",
"value": "station_name:china"
},
{
"type": "string",
"value": "project_id:1"
},
{
"type": "string",
"value": "project_name:cnpp"
},
{
"type": "double",
"value": 1.11
},
{
"type": "double",
"value": 2.22
},
{
"type": "double",
"value": 3.33
}
],
"sliceRecordCount": 10
}
},
"writer": {
"name": "tdengine30writer",
"parameter": {
"username": "root",
"password": "taosdata",
"column": [
"tbname",
"event_time",
"device_name",
"device_code",
"station_id",
"station_name",
"project_id",
"project_name",
"twoutfan1",
"twoutfan2",
"twoutfan3"
],
"connection": [
{
"table": [
"cnpp_ads_wmct_d"
],
"jdbcUrl": "jdbc:TAOS-RS://192.168.0.201:6041/zyyang?user=root&password=taosdata"
}
],
"ignoreTagsUnmatched": true,
"batchSize": "8182"
}
}
}
],
"setting": {
"speed": {
"channel": 1
}
}
}
}