把冗余代码合并成了一个函数,删除了不必要的注释

This commit is contained in:
sanChouIsACat 2021-12-29 19:23:34 +08:00
parent 042aa2d865
commit c96a366a4e
3 changed files with 28 additions and 54 deletions

View File

@ -63,7 +63,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
} }
jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时 jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
if (ObReaderUtils.compatibleMode == ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE) { if (ObReaderUtils.compatibleMode.equals(ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE)) {
compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE; compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
} }
LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl); LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl);

View File

@ -294,31 +294,14 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
} }
private void addLeftRecords() { private void addLeftRecords() {
//不需要刷新Cache已经是最后一批数据了
for (List<Record> groupValues : groupInsertValues.values()) { for (List<Record> groupValues : groupInsertValues.values()) {
if (groupValues.size() > 0 ) { if (groupValues.size() > 0 ) {
int retry = 0; addRecordsToWriteQueue(groupValues);
while (true) {
try {
concurrentWriter.addBatchRecords(groupValues);
break;
} catch (InterruptedException e) {
retry++;
LOG.info("Concurrent table writer is interrupted, retry {}", retry);
}
}
} }
} }
if (unknownPartRecords.size() > 0) { if (unknownPartRecords.size() > 0) {
int retry = 0; addRecordsToWriteQueue(unknownPartRecords);
while (true) {
try {
concurrentWriter.addBatchRecords(unknownPartRecords);
break;
} catch (InterruptedException e) {
retry++;
LOG.info("Concurrent table writer is interrupted, retry {}", retry);
}
}
} }
} }
@ -344,43 +327,40 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
groupValues.add(record); groupValues.add(record);
if (groupValues.size() >= batchSize) { if (groupValues.size() >= batchSize) {
int i = 0; int i = 0;
while (true) { groupValues =addRecordsToWriteQueue(groupValues);
if (i > 0) {
LOG.info("retry add batch record the {} times", i);
}
try {
concurrentWriter.addBatchRecords(groupValues);
printEveryTime();
break;
} catch (InterruptedException e) {
LOG.info("Concurrent table writer is interrupted");
}
}
groupValues = new ArrayList<Record>(batchSize);
groupInsertValues.put(partId, groupValues); groupInsertValues.put(partId, groupValues);
} }
} else { } else {
LOG.debug("add unknown part record {}", record); LOG.debug("add unknown part record {}", record);
unknownPartRecords.add(record); unknownPartRecords.add(record);
int i = 0;
if (unknownPartRecords.size() > batchSize) { if (unknownPartRecords.size() > batchSize) {
unknownPartRecords=addRecordsToWriteQueue(unknownPartRecords);
}
}
}
/**
*
* @param records
* @return 返回一个新的Cache用于存储接下来的数据
*/
private List<Record> addRecordsToWriteQueue(List<Record> records) {
int i = 0;
while (true) { while (true) {
if (i > 0) { if (i > 0) {
LOG.info("retry add batch record the {} times", i); LOG.info("retry add batch record the {} times", i);
} }
try { try {
concurrentWriter.addBatchRecords(unknownPartRecords); concurrentWriter.addBatchRecords(records);
break; break;
} catch (InterruptedException e) { } catch (InterruptedException e) {
i++;
LOG.info("Concurrent table writer is interrupted"); LOG.info("Concurrent table writer is interrupted");
} }
} }
return new ArrayList<Record>(batchSize);
} }
}
}
private void checkMemStore() { private void checkMemStore() {
Connection checkConn = checkConnHolder.reconnect(); Connection checkConn = checkConnHolder.reconnect();
long now = System.currentTimeMillis(); long now = System.currentTimeMillis();

View File

@ -25,7 +25,6 @@ public class ObWriterUtils {
return new HashSet(Arrays.asList(keywords.split(","))); return new HashSet(Arrays.asList(keywords.split(",")));
} }
//java中的String的坑
public static String escapeDatabaseKeywords(String keyword) { public static String escapeDatabaseKeywords(String keyword) {
if (databaseKeywords == null) { if (databaseKeywords == null) {
if (isOracleMode()) { if (isOracleMode()) {
@ -103,11 +102,6 @@ public class ObWriterUtils {
} }
private static int[] getColumnIndex(List<String> columnsInIndex, List<String> allColumns) { private static int[] getColumnIndex(List<String> columnsInIndex, List<String> allColumns) {
/**
* JDK8的stream模型将一种数据结构转化成通用的数据模型并可在该模型上进行操作
* map:接受一个函数引用用于操作元素
* collect接受一个Collectors方法用于将中间数据模型转化成目标数据结构
*/
for (int i = 0; i < allColumns.size(); i++) { for (int i = 0; i < allColumns.size(); i++) {
if (!ObWriterUtils.isEscapeMode(allColumns.get(i))) { if (!ObWriterUtils.isEscapeMode(allColumns.get(i))) {
allColumns.set(i, allColumns.get(i).toUpperCase()); allColumns.set(i, allColumns.get(i).toUpperCase());
@ -167,7 +161,7 @@ public class ObWriterUtils {
String columnName = rs.getString("Column_name"); String columnName = rs.getString("Column_name");
columnName=escapeDatabaseKeywords(columnName); columnName=escapeDatabaseKeywords(columnName);
if(!ObWriterUtils.isEscapeMode(columnName)){ if(!ObWriterUtils.isEscapeMode(columnName)){
columnName=columnName.toUpperCase(); columnName = columnName.toUpperCase();
} }
List<String> s = uniqueKeys.get(keyName); List<String> s = uniqueKeys.get(keyName);
if (s == null) { if (s == null) {