From 1e03c200f71ea3f042d9b69f1cb1fa511fe6062d Mon Sep 17 00:00:00 2001 From: fariel Date: Wed, 1 Sep 2021 20:37:00 +0800 Subject: [PATCH] support wildcard column --- build.sh | 1 + .../starrockswriter/StarRocksWriter.java | 5 +++++ .../StarRocksWriterOptions.java | 20 ++++++++++++++++++- .../util/StarRocksWriterUtil.java | 19 ++++++++++++++++++ 4 files changed, 44 insertions(+), 1 deletion(-) create mode 100755 build.sh diff --git a/build.sh b/build.sh new file mode 100755 index 00000000..e3992acb --- /dev/null +++ b/build.sh @@ -0,0 +1 @@ +mvn -U -pl starrockswriter -am clean package assembly:assembly -Dmaven.test.skip=true diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriter.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriter.java index 666a99d9..9d8fdf15 100755 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriter.java +++ b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriter.java @@ -93,6 +93,11 @@ public class StarRocksWriter extends Writer { @Override public void init() { options = new StarRocksWriterOptions(super.getPluginJobConf()); + if (options.isWildcardColumn()) { + Connection conn = DBUtil.getConnection(DataBaseType.MySql, options.getJdbcUrl(), options.getUsername(), options.getPassword()); + List columns = StarRocksWriterUtil.getStarRocksColumns(conn, options.getDatabase(), options.getTable()); + options.setInfoCchemaColumns(columns); + } writerManager = new StarRocksWriterManager(options); rowSerializer = StarRocksSerializerFactory.createSerializer(options); } diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriterOptions.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriterOptions.java index 5180512f..9e4abd12 100644 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriterOptions.java +++ b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/StarRocksWriterOptions.java @@ -39,9 +39,16 @@ public class StarRocksWriterOptions implements Serializable { private static final String KEY_LOAD_PROPS = "loadProps"; private final Configuration options; + private List infoCchemaColumns; + private List userSetColumns; + private boolean isWildcardColumn; public StarRocksWriterOptions(Configuration options) { this.options = options; + this.userSetColumns = options.getList(KEY_COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList()); + if (1 == options.getList(KEY_COLUMN, String.class).size() && "*".trim().equals(options.getList(KEY_COLUMN, String.class).get(0))) { + this.isWildcardColumn = true; + } } public void doPretreatment() { @@ -74,7 +81,18 @@ public class StarRocksWriterOptions implements Serializable { } public List getColumns() { - return options.getList(KEY_COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList()); + if (isWildcardColumn) { + return this.infoCchemaColumns; + } + return this.userSetColumns; + } + + public boolean isWildcardColumn() { + return this.isWildcardColumn; + } + + public void setInfoCchemaColumns(List cols) { + this.infoCchemaColumns = cols; } public List getPreSqlList() { diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/util/StarRocksWriterUtil.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/util/StarRocksWriterUtil.java index c3b5d8d1..8de4ad60 100755 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/util/StarRocksWriterUtil.java +++ b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/starrockswriter/util/StarRocksWriterUtil.java @@ -12,6 +12,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.sql.Connection; +import java.sql.ResultSet; import java.sql.Statement; import java.util.*; @@ -20,6 +21,24 @@ public final class StarRocksWriterUtil { private StarRocksWriterUtil() {} + public static List getStarRocksColumns(Connection conn, String databaseName, String tableName) { + String currentSql = String.format("SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '%s' AND `TABLE_NAME` = '%s' ORDER BY `ORDINAL_POSITION` ASC;", databaseName, tableName); + List columns = new ArrayList<>(); + ResultSet rs = null; + try { + rs = DBUtil.query(conn, currentSql); + while (DBUtil.asyncResultSetNext(rs)) { + String colName = rs.getString("COLUMN_NAME"); + columns.add(colName); + } + return columns; + } catch (Exception e) { + throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null); + } finally { + DBUtil.closeDBResources(rs, null, null); + } + } + public static List renderPreOrPostSqls(List preOrPostSqls, String tableName) { if (null == preOrPostSqls) { return Collections.emptyList();