diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java index 1376f113..a43dcebd 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java @@ -63,7 +63,7 @@ public class ReaderTask extends CommonRdbmsReader.Task { } jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时 - if (ObReaderUtils.compatibleMode == ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE) { + if (ObReaderUtils.compatibleMode.equals(ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE)) { compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE; } LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl); diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java index bed0598d..bd34b4a6 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java @@ -294,31 +294,14 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { } private void addLeftRecords() { + //不需要刷新Cache,已经是最后一批数据了 for (List groupValues : groupInsertValues.values()) { if (groupValues.size() > 0 ) { - int retry = 0; - while (true) { - try { - concurrentWriter.addBatchRecords(groupValues); - break; - } catch (InterruptedException e) { - retry++; - LOG.info("Concurrent table writer is interrupted, retry {}", retry); - } - } + addRecordsToWriteQueue(groupValues); } } if (unknownPartRecords.size() > 0) { - int retry = 0; - while (true) { - try { - concurrentWriter.addBatchRecords(unknownPartRecords); - break; - } catch (InterruptedException e) { - retry++; - LOG.info("Concurrent table writer is interrupted, retry {}", retry); - } - } + addRecordsToWriteQueue(unknownPartRecords); } } @@ -344,43 +327,40 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { groupValues.add(record); if (groupValues.size() >= batchSize) { int i = 0; - while (true) { - if (i > 0) { - LOG.info("retry add batch record the {} times", i); - } - try { - concurrentWriter.addBatchRecords(groupValues); - printEveryTime(); - break; - } catch (InterruptedException e) { - LOG.info("Concurrent table writer is interrupted"); - } - } - groupValues = new ArrayList(batchSize); + groupValues =addRecordsToWriteQueue(groupValues); groupInsertValues.put(partId, groupValues); } } else { LOG.debug("add unknown part record {}", record); - unknownPartRecords.add(record); - int i = 0; if (unknownPartRecords.size() > batchSize) { - while (true) { - if (i > 0) { - LOG.info("retry add batch record the {} times", i); - } - try { - concurrentWriter.addBatchRecords(unknownPartRecords); - break; - } catch (InterruptedException e) { - LOG.info("Concurrent table writer is interrupted"); - } - } + unknownPartRecords=addRecordsToWriteQueue(unknownPartRecords); } } } + /** + * + * @param records + * @return 返回一个新的Cache用于存储接下来的数据 + */ + private List addRecordsToWriteQueue(List records) { + int i = 0; + while (true) { + if (i > 0) { + LOG.info("retry add batch record the {} times", i); + } + try { + concurrentWriter.addBatchRecords(records); + break; + } catch (InterruptedException e) { + i++; + LOG.info("Concurrent table writer is interrupted"); + } + } + return new ArrayList(batchSize); + } private void checkMemStore() { Connection checkConn = checkConnHolder.reconnect(); long now = System.currentTimeMillis(); diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java index f4c4f439..ff1648a1 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java @@ -25,7 +25,6 @@ public class ObWriterUtils { return new HashSet(Arrays.asList(keywords.split(","))); } - //java中的String的坑 public static String escapeDatabaseKeywords(String keyword) { if (databaseKeywords == null) { if (isOracleMode()) { @@ -103,11 +102,6 @@ public class ObWriterUtils { } private static int[] getColumnIndex(List columnsInIndex, List allColumns) { - /** - * JDK8的stream模型:将一种数据结构转化成通用的数据模型,并可在该模型上进行操作 - * map:接受一个函数引用,用于操作元素 - * collect:接受一个Collectors方法,用于将中间数据模型转化成目标数据结构 - */ for (int i = 0; i < allColumns.size(); i++) { if (!ObWriterUtils.isEscapeMode(allColumns.get(i))) { allColumns.set(i, allColumns.get(i).toUpperCase()); @@ -167,7 +161,7 @@ public class ObWriterUtils { String columnName = rs.getString("Column_name"); columnName=escapeDatabaseKeywords(columnName); if(!ObWriterUtils.isEscapeMode(columnName)){ - columnName=columnName.toUpperCase(); + columnName = columnName.toUpperCase(); } List s = uniqueKeys.get(keyName); if (s == null) {