From ce29ae7ee5d7ac5163c30b80ef0b41c361ced7b8 Mon Sep 17 00:00:00 2001 From: sanChouIsACat <993924507@qq.com> Date: Tue, 28 Dec 2021 18:25:16 +0800 Subject: [PATCH] =?UTF-8?q?=E6=9C=AC=E6=AC=A1=E6=8F=90=E4=BA=A4=E6=9B=B4?= =?UTF-8?q?=E6=96=B0=E4=BA=86=E8=BD=AC=E4=B9=89=E7=9A=84=E5=AD=97=E7=AC=A6?= =?UTF-8?q?=E4=B8=8D=E9=9C=80=E8=A6=81=E8=BF=9B=E8=A1=8C=E5=A4=A7=E5=B0=8F?= =?UTF-8?q?=E5=86=99=E8=BD=AC=E6=8D=A2?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../util/ObReaderUtils.java | 71 +++++++++---------- .../task/ConcurrentTableWriterTask.java | 38 +++++++--- .../util/ObWriterUtils.java | 44 +++++++++--- 3 files changed, 96 insertions(+), 57 deletions(-) diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java index 878aaea6..20c2f922 100644 --- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java +++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java @@ -1,37 +1,22 @@ package com.alibaba.datax.plugin.reader.oceanbasev10reader.util; -import java.sql.Connection; -import java.sql.PreparedStatement; -import java.sql.ResultSet; -import java.sql.SQLException; -import java.sql.Statement; -import java.sql.Timestamp; -import java.util.*; -import java.util.Map.Entry; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - +import com.alibaba.datax.common.element.*; +import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.druid.sql.SQLUtils; +import com.alibaba.druid.sql.ast.SQLExpr; +import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr; +import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator; import org.apache.commons.lang3.ArrayUtils; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.alibaba.datax.common.element.BoolColumn; -import com.alibaba.datax.common.element.BytesColumn; -import com.alibaba.datax.common.element.Column; -import com.alibaba.datax.common.element.DateColumn; -import com.alibaba.datax.common.element.DoubleColumn; -import com.alibaba.datax.common.element.LongColumn; -import com.alibaba.datax.common.element.Record; -import com.alibaba.datax.common.element.StringColumn; -import com.alibaba.datax.plugin.rdbms.util.DBUtil; -import com.alibaba.druid.sql.SQLUtils; -import com.alibaba.druid.sql.ast.SQLExpr; -import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr; -import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator; - -import javax.xml.crypto.Data; +import java.sql.*; +import java.util.*; +import java.util.Map.Entry; +import java.util.regex.Matcher; +import java.util.regex.Pattern; public class ObReaderUtils { @@ -53,21 +38,32 @@ public class ObReaderUtils { return new HashSet(Arrays.asList(keywords.split(","))); } - public static void escapeDatabaseKeywords(List keywords) { + public static String escapeDatabaseKeywords(String keyword) { if (databaseKeywords == null) { - if (isOracleMode(compatibleMode.toString())) { + if (isOracleMode(compatibleMode)) { databaseKeywords = keywordsFromString2HashSet(ORACLE_KEYWORDS); } else { databaseKeywords = keywordsFromString2HashSet(MYSQL_KEYWORDS); } } - char escapeChar = isOracleMode(compatibleMode.toString()) ? '"' : '`'; + char escapeChar = isOracleMode(compatibleMode) ? '"' : '`'; + if (databaseKeywords.contains(keyword.toUpperCase())) { + keyword = escapeChar + keyword + escapeChar; + } + return keyword; + } + + public static void escapeDatabaseKeywords(List keywords) { for (int i = 0; i < keywords.size(); i++) { - String keyword = keywords.get(i); - if (databaseKeywords.contains(keyword.toUpperCase())) { - keyword = escapeChar + keyword + escapeChar; - } - keywords.set(i, keyword); + keywords.set(i, escapeDatabaseKeywords(keywords.get(i))); + } + } + + public static Boolean isEscapeMode(String keyword) { + if (isOracleMode(compatibleMode)) { + return keyword.startsWith("\"") && keyword.endsWith("\""); + } else { + return keyword.startsWith("`") && keyword.endsWith("`"); } } @@ -151,12 +147,15 @@ public class ObReaderUtils { ps = conn.createStatement(); rs = ps.executeQuery(sql); while (rs.next()) { - String columnName = StringUtils.lowerCase(rs.getString("Column_name")); + String columnName = rs.getString("Column_name"); + columnName = escapeDatabaseKeywords(columnName); + if (!isEscapeMode(columnName)) { + columnName.toLowerCase(); + } if (!realIndex.contains(columnName)) { realIndex.add(columnName); } } - escapeDatabaseKeywords(realIndex); String[] pks = new String[realIndex.size()]; realIndex.toArray(pks); diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java index 1e6d27c6..eb1222c3 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java @@ -62,6 +62,7 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { private ObPartitionIdCalculator partCalculator = null; private HashMap> groupInsertValues; + List unknownPartRecords = new ArrayList(); // private List unknownPartRecords; private List partitionKeyIndexes; @@ -307,6 +308,18 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { } } } + if(unknownPartRecords.size()>0){ + int retry = 0; + while (true) { + try { + concurrentWriter.addBatchRecords(unknownPartRecords); + break; + } catch (InterruptedException e) { + retry++; + LOG.info("Concurrent table writer is interrupted, retry {}", retry); + } + } + } } private void addRecordToCache(final Record record) { @@ -347,21 +360,24 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task { groupInsertValues.put(partId, groupValues); } } else { - LOG.warn("add unknown part record {}", record); - List unknownPartRecords = new ArrayList(); + LOG.debug("add unknown part record {}", record); + unknownPartRecords.add(record); int i = 0; - while (true) { - if (i > 0) { - LOG.info("retry add batch record the {} times", i); - } - try { - concurrentWriter.addBatchRecords(unknownPartRecords); - break; - } catch (InterruptedException e) { - LOG.info("Concurrent table writer is interrupted"); + if(unknownPartRecords.size()>batchSize){ + while (true) { + if (i > 0) { + LOG.info("retry add batch record the {} times", i); + } + try { + concurrentWriter.addBatchRecords(unknownPartRecords); + break; + } catch (InterruptedException e) { + LOG.info("Concurrent table writer is interrupted"); + } } } + } } diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java index 072a0f21..f4c4f439 100644 --- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java +++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java @@ -11,7 +11,6 @@ import org.slf4j.LoggerFactory; import java.sql.*; import java.util.*; -import java.util.stream.Collectors; public class ObWriterUtils { @@ -26,8 +25,8 @@ public class ObWriterUtils { return new HashSet(Arrays.asList(keywords.split(","))); } - public static void escapeDatabaseKeywords(List keywords) { - //判断是否需要更改关键字集合 + //java中的String的坑 + public static String escapeDatabaseKeywords(String keyword) { if (databaseKeywords == null) { if (isOracleMode()) { databaseKeywords = keywordsFromString2HashSet(ORACLE_KEYWORDS); @@ -36,12 +35,22 @@ public class ObWriterUtils { } } char escapeChar = isOracleMode() ? '"' : '`'; + if (databaseKeywords.contains(keyword.toUpperCase())) { + keyword = escapeChar + keyword + escapeChar; + } + return keyword; + } + + public static void escapeDatabaseKeywords(List keywords) { for (int i = 0; i < keywords.size(); i++) { - String keyword = keywords.get(i); - if (databaseKeywords.contains(keyword.toUpperCase())) { - keyword = escapeChar + keyword + escapeChar; - } - keywords.set(i, keyword); + keywords.set(i, escapeDatabaseKeywords(keywords.get(i))); + } + } + public static Boolean isEscapeMode(String keyword){ + if(isOracleMode()){ + return keyword.startsWith("\"") && keyword.endsWith("\""); + }else{ + return keyword.startsWith("`") && keyword.endsWith("`"); } } public static boolean isMemstoreFull(Connection conn, double memstoreThreshold) { @@ -94,7 +103,16 @@ public class ObWriterUtils { } private static int[] getColumnIndex(List columnsInIndex, List allColumns) { - allColumns = allColumns.stream().map(String::toUpperCase).collect(Collectors.toList()); + /** + * JDK8的stream模型:将一种数据结构转化成通用的数据模型,并可在该模型上进行操作 + * map:接受一个函数引用,用于操作元素 + * collect:接受一个Collectors方法,用于将中间数据模型转化成目标数据结构 + */ + for (int i = 0; i < allColumns.size(); i++) { + if (!ObWriterUtils.isEscapeMode(allColumns.get(i))) { + allColumns.set(i, allColumns.get(i).toUpperCase()); + } + } int[] colIdx = new int[columnsInIndex.size()]; for (int i = 0; i < columnsInIndex.size(); i++) { int index = allColumns.indexOf(columnsInIndex.get(i)); @@ -146,7 +164,11 @@ public class ObWriterUtils { rs = stmt.executeQuery(sql); while (rs.next()) { String keyName = rs.getString("Key_name"); - String columnName = StringUtils.upperCase(rs.getString("Column_name")); + String columnName = rs.getString("Column_name"); + columnName=escapeDatabaseKeywords(columnName); + if(!ObWriterUtils.isEscapeMode(columnName)){ + columnName=columnName.toUpperCase(); + } List s = uniqueKeys.get(keyName); if (s == null) { s = new ArrayList(); @@ -159,6 +181,7 @@ public class ObWriterUtils { } finally { asyncClose(rs, stmt, null); } + //ObWriterUtils.escapeDatabaseKeywords(uniqueKeys); return uniqueKeys; } @@ -315,6 +338,7 @@ public class ObWriterUtils { * @param e * @return */ + public static boolean isFatalError(SQLException e) { String sqlState = e.getSQLState(); if (StringUtils.startsWith(sqlState, "08")) {