本次提交更新了转义的字符不需要进行大小写转换

This commit is contained in:
sanChouIsACat 2021-12-28 18:25:16 +08:00
parent b1334308a0
commit ce29ae7ee5
3 changed files with 96 additions and 57 deletions

View File

@ -1,37 +1,22 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.*;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr;
import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.element.BoolColumn;
import com.alibaba.datax.common.element.BytesColumn;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.DoubleColumn;
import com.alibaba.datax.common.element.LongColumn;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.element.StringColumn;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr;
import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator;
import javax.xml.crypto.Data;
import java.sql.*;
import java.util.*;
import java.util.Map.Entry;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
public class ObReaderUtils {
@ -53,21 +38,32 @@ public class ObReaderUtils {
return new HashSet(Arrays.asList(keywords.split(",")));
}
public static void escapeDatabaseKeywords(List<String> keywords) {
public static String escapeDatabaseKeywords(String keyword) {
if (databaseKeywords == null) {
if (isOracleMode(compatibleMode.toString())) {
if (isOracleMode(compatibleMode)) {
databaseKeywords = keywordsFromString2HashSet(ORACLE_KEYWORDS);
} else {
databaseKeywords = keywordsFromString2HashSet(MYSQL_KEYWORDS);
}
}
char escapeChar = isOracleMode(compatibleMode.toString()) ? '"' : '`';
char escapeChar = isOracleMode(compatibleMode) ? '"' : '`';
if (databaseKeywords.contains(keyword.toUpperCase())) {
keyword = escapeChar + keyword + escapeChar;
}
return keyword;
}
public static void escapeDatabaseKeywords(List<String> keywords) {
for (int i = 0; i < keywords.size(); i++) {
String keyword = keywords.get(i);
if (databaseKeywords.contains(keyword.toUpperCase())) {
keyword = escapeChar + keyword + escapeChar;
}
keywords.set(i, keyword);
keywords.set(i, escapeDatabaseKeywords(keywords.get(i)));
}
}
public static Boolean isEscapeMode(String keyword) {
if (isOracleMode(compatibleMode)) {
return keyword.startsWith("\"") && keyword.endsWith("\"");
} else {
return keyword.startsWith("`") && keyword.endsWith("`");
}
}
@ -151,12 +147,15 @@ public class ObReaderUtils {
ps = conn.createStatement();
rs = ps.executeQuery(sql);
while (rs.next()) {
String columnName = StringUtils.lowerCase(rs.getString("Column_name"));
String columnName = rs.getString("Column_name");
columnName = escapeDatabaseKeywords(columnName);
if (!isEscapeMode(columnName)) {
columnName.toLowerCase();
}
if (!realIndex.contains(columnName)) {
realIndex.add(columnName);
}
}
escapeDatabaseKeywords(realIndex);
String[] pks = new String[realIndex.size()];
realIndex.toArray(pks);

View File

@ -62,6 +62,7 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
private ObPartitionIdCalculator partCalculator = null;
private HashMap<Long, List<Record>> groupInsertValues;
List<Record> unknownPartRecords = new ArrayList<Record>();
// private List<Record> unknownPartRecords;
private List<Integer> partitionKeyIndexes;
@ -307,6 +308,18 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
}
}
}
if(unknownPartRecords.size()>0){
int retry = 0;
while (true) {
try {
concurrentWriter.addBatchRecords(unknownPartRecords);
break;
} catch (InterruptedException e) {
retry++;
LOG.info("Concurrent table writer is interrupted, retry {}", retry);
}
}
}
}
private void addRecordToCache(final Record record) {
@ -347,21 +360,24 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
groupInsertValues.put(partId, groupValues);
}
} else {
LOG.warn("add unknown part record {}", record);
List<Record> unknownPartRecords = new ArrayList<Record>();
LOG.debug("add unknown part record {}", record);
unknownPartRecords.add(record);
int i = 0;
while (true) {
if (i > 0) {
LOG.info("retry add batch record the {} times", i);
}
try {
concurrentWriter.addBatchRecords(unknownPartRecords);
break;
} catch (InterruptedException e) {
LOG.info("Concurrent table writer is interrupted");
if(unknownPartRecords.size()>batchSize){
while (true) {
if (i > 0) {
LOG.info("retry add batch record the {} times", i);
}
try {
concurrentWriter.addBatchRecords(unknownPartRecords);
break;
} catch (InterruptedException e) {
LOG.info("Concurrent table writer is interrupted");
}
}
}
}
}

View File

@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.*;
import java.util.stream.Collectors;
public class ObWriterUtils {
@ -26,8 +25,8 @@ public class ObWriterUtils {
return new HashSet(Arrays.asList(keywords.split(",")));
}
public static void escapeDatabaseKeywords(List<String> keywords) {
//判断是否需要更改关键字集合
//java中的String的坑
public static String escapeDatabaseKeywords(String keyword) {
if (databaseKeywords == null) {
if (isOracleMode()) {
databaseKeywords = keywordsFromString2HashSet(ORACLE_KEYWORDS);
@ -36,12 +35,22 @@ public class ObWriterUtils {
}
}
char escapeChar = isOracleMode() ? '"' : '`';
if (databaseKeywords.contains(keyword.toUpperCase())) {
keyword = escapeChar + keyword + escapeChar;
}
return keyword;
}
public static void escapeDatabaseKeywords(List<String> keywords) {
for (int i = 0; i < keywords.size(); i++) {
String keyword = keywords.get(i);
if (databaseKeywords.contains(keyword.toUpperCase())) {
keyword = escapeChar + keyword + escapeChar;
}
keywords.set(i, keyword);
keywords.set(i, escapeDatabaseKeywords(keywords.get(i)));
}
}
public static Boolean isEscapeMode(String keyword){
if(isOracleMode()){
return keyword.startsWith("\"") && keyword.endsWith("\"");
}else{
return keyword.startsWith("`") && keyword.endsWith("`");
}
}
public static boolean isMemstoreFull(Connection conn, double memstoreThreshold) {
@ -94,7 +103,16 @@ public class ObWriterUtils {
}
private static int[] getColumnIndex(List<String> columnsInIndex, List<String> allColumns) {
allColumns = allColumns.stream().map(String::toUpperCase).collect(Collectors.toList());
/**
* JDK8的stream模型将一种数据结构转化成通用的数据模型并可在该模型上进行操作
* map:接受一个函数引用用于操作元素
* collect接受一个Collectors方法用于将中间数据模型转化成目标数据结构
*/
for (int i = 0; i < allColumns.size(); i++) {
if (!ObWriterUtils.isEscapeMode(allColumns.get(i))) {
allColumns.set(i, allColumns.get(i).toUpperCase());
}
}
int[] colIdx = new int[columnsInIndex.size()];
for (int i = 0; i < columnsInIndex.size(); i++) {
int index = allColumns.indexOf(columnsInIndex.get(i));
@ -146,7 +164,11 @@ public class ObWriterUtils {
rs = stmt.executeQuery(sql);
while (rs.next()) {
String keyName = rs.getString("Key_name");
String columnName = StringUtils.upperCase(rs.getString("Column_name"));
String columnName = rs.getString("Column_name");
columnName=escapeDatabaseKeywords(columnName);
if(!ObWriterUtils.isEscapeMode(columnName)){
columnName=columnName.toUpperCase();
}
List<String> s = uniqueKeys.get(keyName);
if (s == null) {
s = new ArrayList();
@ -159,6 +181,7 @@ public class ObWriterUtils {
} finally {
asyncClose(rs, stmt, null);
}
//ObWriterUtils.escapeDatabaseKeywords(uniqueKeys);
return uniqueKeys;
}
@ -315,6 +338,7 @@ public class ObWriterUtils {
* @param e
* @return
*/
public static boolean isFatalError(SQLException e) {
String sqlState = e.getSQLState();
if (StringUtils.startsWith(sqlState, "08")) {