support wildcard column

This commit is contained in:
fariel 2021-09-01 20:37:00 +08:00
parent 0582da63a5
commit 1e03c200f7
4 changed files with 44 additions and 1 deletions

1
build.sh Executable file
View File

@ -0,0 +1 @@
mvn -U -pl starrockswriter -am clean package assembly:assembly -Dmaven.test.skip=true

View File

@ -93,6 +93,11 @@ public class StarRocksWriter extends Writer {
@Override
public void init() {
options = new StarRocksWriterOptions(super.getPluginJobConf());
if (options.isWildcardColumn()) {
Connection conn = DBUtil.getConnection(DataBaseType.MySql, options.getJdbcUrl(), options.getUsername(), options.getPassword());
List<String> columns = StarRocksWriterUtil.getStarRocksColumns(conn, options.getDatabase(), options.getTable());
options.setInfoCchemaColumns(columns);
}
writerManager = new StarRocksWriterManager(options);
rowSerializer = StarRocksSerializerFactory.createSerializer(options);
}

View File

@ -39,9 +39,16 @@ public class StarRocksWriterOptions implements Serializable {
private static final String KEY_LOAD_PROPS = "loadProps";
private final Configuration options;
private List<String> infoCchemaColumns;
private List<String> userSetColumns;
private boolean isWildcardColumn;
public StarRocksWriterOptions(Configuration options) {
this.options = options;
this.userSetColumns = options.getList(KEY_COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList());
if (1 == options.getList(KEY_COLUMN, String.class).size() && "*".trim().equals(options.getList(KEY_COLUMN, String.class).get(0))) {
this.isWildcardColumn = true;
}
}
public void doPretreatment() {
@ -74,7 +81,18 @@ public class StarRocksWriterOptions implements Serializable {
}
public List<String> getColumns() {
return options.getList(KEY_COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList());
if (isWildcardColumn) {
return this.infoCchemaColumns;
}
return this.userSetColumns;
}
public boolean isWildcardColumn() {
return this.isWildcardColumn;
}
public void setInfoCchemaColumns(List<String> cols) {
this.infoCchemaColumns = cols;
}
public List<String> getPreSqlList() {

View File

@ -12,6 +12,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.Statement;
import java.util.*;
@ -20,6 +21,24 @@ public final class StarRocksWriterUtil {
private StarRocksWriterUtil() {}
public static List<String> getStarRocksColumns(Connection conn, String databaseName, String tableName) {
String currentSql = String.format("SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '%s' AND `TABLE_NAME` = '%s' ORDER BY `ORDINAL_POSITION` ASC;", databaseName, tableName);
List<String> columns = new ArrayList<>();
ResultSet rs = null;
try {
rs = DBUtil.query(conn, currentSql);
while (DBUtil.asyncResultSetNext(rs)) {
String colName = rs.getString("COLUMN_NAME");
columns.add(colName);
}
return columns;
} catch (Exception e) {
throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
} finally {
DBUtil.closeDBResources(rs, null, null);
}
}
public static List<String> renderPreOrPostSqls(List<String> preOrPostSqls, String tableName) {
if (null == preOrPostSqls) {
return Collections.emptyList();