mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 17:41:37 +08:00
obreader: support table name with schema name
This commit is contained in:
parent
f6f94821bc
commit
8d9e936047
@ -143,10 +143,20 @@ public class ObReaderUtils {
|
|||||||
String sql = "show index from " + tableName + " where Key_name='PRIMARY'";
|
String sql = "show index from " + tableName + " where Key_name='PRIMARY'";
|
||||||
if (isOracleMode(context.getCompatibleMode())) {
|
if (isOracleMode(context.getCompatibleMode())) {
|
||||||
tableName = tableName.toUpperCase();
|
tableName = tableName.toUpperCase();
|
||||||
sql = "SELECT cols.column_name Column_name " +
|
String schema;
|
||||||
|
if (tableName.contains(".")) {
|
||||||
|
schema = String.format("'%s'", tableName.substring(0, tableName.indexOf(".")));
|
||||||
|
tableName = tableName.substring(tableName.indexOf(".") + 1);
|
||||||
|
} else {
|
||||||
|
schema = "(select sys_context('USERENV','current_schema') from dual)";
|
||||||
|
}
|
||||||
|
sql = String.format(
|
||||||
|
"SELECT cols.column_name Column_name " +
|
||||||
"FROM all_constraints cons, all_cons_columns cols " +
|
"FROM all_constraints cons, all_cons_columns cols " +
|
||||||
"WHERE cols.table_name = '" + tableName + "' AND cons.constraint_type = 'P' " +
|
"WHERE cols.table_name = '%s' AND cons.constraint_type = 'P' " +
|
||||||
"AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner";
|
"AND cons.constraint_name = cols.constraint_name " +
|
||||||
|
"AND cons.owner = cols.owner and cons.OWNER = %s",
|
||||||
|
tableName, schema);
|
||||||
}
|
}
|
||||||
LOG.info("get primary key by sql: " + sql);
|
LOG.info("get primary key by sql: " + sql);
|
||||||
Statement ps = null;
|
Statement ps = null;
|
||||||
@ -156,25 +166,30 @@ public class ObReaderUtils {
|
|||||||
try {
|
try {
|
||||||
ps = conn.createStatement();
|
ps = conn.createStatement();
|
||||||
rs = ps.executeQuery(sql);
|
rs = ps.executeQuery(sql);
|
||||||
|
boolean hasPk = false;
|
||||||
while (rs.next()) {
|
while (rs.next()) {
|
||||||
|
hasPk = true;
|
||||||
String columnName = rs.getString("Column_name");
|
String columnName = rs.getString("Column_name");
|
||||||
columnName = escapeDatabaseKeywords(columnName);
|
columnName = escapeDatabaseKeywords(columnName);
|
||||||
if (!isEscapeMode(columnName)) {
|
if (!isEscapeMode(columnName)) {
|
||||||
columnName.toLowerCase();
|
columnName = columnName.toLowerCase();
|
||||||
}
|
}
|
||||||
if (!realIndex.contains(columnName)) {
|
if (!realIndex.contains(columnName)) {
|
||||||
realIndex.add(columnName);
|
realIndex.add(columnName);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if (hasPk) {
|
||||||
String[] pks = new String[realIndex.size()];
|
String[] pks = new String[realIndex.size()];
|
||||||
realIndex.toArray(pks);
|
realIndex.toArray(pks);
|
||||||
return pks;
|
return pks;
|
||||||
|
}
|
||||||
} catch (Throwable e) {
|
} catch (Throwable e) {
|
||||||
LOG.error("show index from table fail :" + sql, e);
|
LOG.error("show index from table fail :" + sql, e);
|
||||||
} finally {
|
} finally {
|
||||||
close(rs, ps, null);
|
close(rs, ps, null);
|
||||||
}
|
}
|
||||||
|
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -357,7 +372,6 @@ public class ObReaderUtils {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
SQLExpr expr = SQLUtils.toSQLExpr(context.getWhere(), "mysql");
|
SQLExpr expr = SQLUtils.toSQLExpr(context.getWhere(), "mysql");
|
||||||
LOG.info("expr: " + expr);
|
|
||||||
List<String> allColumnsInTab = getAllColumnFromTab(conn, context.getTable());
|
List<String> allColumnsInTab = getAllColumnFromTab(conn, context.getTable());
|
||||||
List<String> allColNames = getColNames(allColumnsInTab, expr);
|
List<String> allColNames = getColNames(allColumnsInTab, expr);
|
||||||
|
|
||||||
@ -449,9 +463,19 @@ public class ObReaderUtils {
|
|||||||
Map<String, List<String>> allIndex = new HashMap<String, List<String>>();
|
Map<String, List<String>> allIndex = new HashMap<String, List<String>>();
|
||||||
String sql = "show index from " + tableName;
|
String sql = "show index from " + tableName;
|
||||||
if (isOracleMode(compatibleMode)) {
|
if (isOracleMode(compatibleMode)) {
|
||||||
|
String schema;
|
||||||
tableName = tableName.toUpperCase();
|
tableName = tableName.toUpperCase();
|
||||||
sql = "SELECT INDEX_NAME Key_name, COLUMN_NAME Column_name " +
|
if (tableName.contains(".")) {
|
||||||
"from dba_ind_columns where TABLE_NAME = '" + tableName + "' " +
|
schema = String.format("'%s'", tableName.substring(0, tableName.indexOf(".")));
|
||||||
|
tableName = tableName.substring(tableName.indexOf(".") + 1);
|
||||||
|
} else {
|
||||||
|
schema = "(select sys_context('USERENV','current_schema') from dual)";
|
||||||
|
}
|
||||||
|
|
||||||
|
sql = String.format(
|
||||||
|
"SELECT INDEX_NAME Key_name, COLUMN_NAME Column_name " +
|
||||||
|
"from all_ind_columns " +
|
||||||
|
"where TABLE_NAME = '%s' and TABLE_OWNER = %s " +
|
||||||
" union all " +
|
" union all " +
|
||||||
"SELECT DISTINCT " +
|
"SELECT DISTINCT " +
|
||||||
"CASE " +
|
"CASE " +
|
||||||
@ -461,9 +485,12 @@ public class ObReaderUtils {
|
|||||||
"END AS Key_name, " +
|
"END AS Key_name, " +
|
||||||
"cols.column_name Column_name " +
|
"cols.column_name Column_name " +
|
||||||
"FROM all_constraints cons, all_cons_columns cols " +
|
"FROM all_constraints cons, all_cons_columns cols " +
|
||||||
"WHERE cols.table_name = '" + tableName + "' AND cons.constraint_type in('P', 'U') " +
|
"WHERE cols.table_name = '%s' AND cons.constraint_type in('P', 'U') " +
|
||||||
"AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner";
|
"AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner " +
|
||||||
|
"AND cons.owner = %s",
|
||||||
|
tableName, schema, tableName, schema);
|
||||||
}
|
}
|
||||||
|
|
||||||
Statement stmt = null;
|
Statement stmt = null;
|
||||||
ResultSet rs = null;
|
ResultSet rs = null;
|
||||||
|
|
||||||
@ -486,11 +513,26 @@ public class ObReaderUtils {
|
|||||||
// add primary key to all index
|
// add primary key to all index
|
||||||
if (allIndex.containsKey("PRIMARY")) {
|
if (allIndex.containsKey("PRIMARY")) {
|
||||||
List<String> colsInPrimary = allIndex.get("PRIMARY");
|
List<String> colsInPrimary = allIndex.get("PRIMARY");
|
||||||
for (String keyName : allIndex.keySet()) {
|
Iterator<Map.Entry<String, List<String>>> iterator = allIndex.entrySet().iterator();
|
||||||
if (keyName.equals("PRIMARY")) {
|
while (iterator.hasNext()) {
|
||||||
|
Map.Entry<String, List<String>> entry = iterator.next();
|
||||||
|
if (entry.getKey().equals("PRIMARY")) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
allIndex.get(keyName).addAll(colsInPrimary);
|
|
||||||
|
// remove the index which is identical with primary key
|
||||||
|
List<String> indexColumns = entry.getValue();
|
||||||
|
if (colsInPrimary.equals(indexColumns)) {
|
||||||
|
iterator.remove();
|
||||||
|
} else {
|
||||||
|
// add primary key to the index if the index is not on the column
|
||||||
|
colsInPrimary.forEach(
|
||||||
|
c -> {
|
||||||
|
if (!indexColumns.contains(c)) {
|
||||||
|
indexColumns.add(c);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -504,6 +546,7 @@ public class ObReaderUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
* find out the indexes which contains all columns in where conditions
|
||||||
* @param conn
|
* @param conn
|
||||||
* @param table
|
* @param table
|
||||||
* @param colNamesInCondition
|
* @param colNamesInCondition
|
||||||
@ -517,7 +560,7 @@ public class ObReaderUtils {
|
|||||||
return indexNames;
|
return indexNames;
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info("columNamesInConditions: " + String.join(",", colNamesInCondition));
|
LOG.info("columnNamesInConditions: " + String.join(",", colNamesInCondition));
|
||||||
|
|
||||||
Map<String, List<String>> allIndex = getAllIndex(conn, table, compatibleMode);
|
Map<String, List<String>> allIndex = getAllIndex(conn, table, compatibleMode);
|
||||||
for (String keyName : allIndex.keySet()) {
|
for (String keyName : allIndex.keySet()) {
|
||||||
@ -528,7 +571,7 @@ public class ObReaderUtils {
|
|||||||
if (allIndex.get(keyName).size() < colNamesInCondition.size()) {
|
if (allIndex.get(keyName).size() < colNamesInCondition.size()) {
|
||||||
indexNotMatch = true;
|
indexNotMatch = true;
|
||||||
} else {
|
} else {
|
||||||
// the the first number columns of this index
|
// the first number columns of this index
|
||||||
int num = colNamesInCondition.size();
|
int num = colNamesInCondition.size();
|
||||||
for (String colName : allIndex.get(keyName)) {
|
for (String colName : allIndex.get(keyName)) {
|
||||||
if (!colNamesInCondition.contains(colName)) {
|
if (!colNamesInCondition.contains(colName)) {
|
||||||
|
Loading…
Reference in New Issue
Block a user