mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 17:40:28 +08:00
修改了问题
This commit is contained in:
parent
dcb541f048
commit
9f09039a23
@ -1,13 +1,5 @@
|
|||||||
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;
|
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;
|
||||||
|
|
||||||
import java.sql.*;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Column;
|
import com.alibaba.datax.common.element.Column;
|
||||||
import com.alibaba.datax.common.element.Record;
|
import com.alibaba.datax.common.element.Record;
|
||||||
import com.alibaba.datax.common.plugin.RecordSender;
|
import com.alibaba.datax.common.plugin.RecordSender;
|
||||||
@ -19,11 +11,17 @@ import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
|
|||||||
import com.alibaba.datax.plugin.rdbms.reader.Constant;
|
import com.alibaba.datax.plugin.rdbms.reader.Constant;
|
||||||
import com.alibaba.datax.plugin.rdbms.reader.Key;
|
import com.alibaba.datax.plugin.rdbms.reader.Key;
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
|
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
|
||||||
import com.alibaba.datax.plugin.reader.oceanbasev10reader.Config;
|
import com.alibaba.datax.plugin.reader.oceanbasev10reader.Config;
|
||||||
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
|
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
|
||||||
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.TaskContext;
|
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.TaskContext;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.sql.*;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
public class ReaderTask extends CommonRdbmsReader.Task {
|
public class ReaderTask extends CommonRdbmsReader.Task {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ReaderTask.class);
|
private static final Logger LOG = LoggerFactory.getLogger(ReaderTask.class);
|
||||||
@ -64,13 +62,9 @@ public class ReaderTask extends CommonRdbmsReader.Task {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (ObReaderUtils.databaseType == DataBaseType.OceanBase) {
|
jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
|
||||||
jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
|
if (ObReaderUtils.compatibleMode == ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE) {
|
||||||
} else {
|
compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
|
||||||
jdbcUrl = jdbcUrl + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
|
|
||||||
}
|
|
||||||
if(ObReaderUtils.compatibleMode==ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE){
|
|
||||||
compatibleMode=ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
|
|
||||||
}
|
}
|
||||||
LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl);
|
LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl);
|
||||||
mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "");
|
mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "");
|
||||||
|
@ -717,6 +717,6 @@ public class ObReaderUtils {
|
|||||||
}
|
}
|
||||||
|
|
||||||
public static boolean isOracleMode(String mode) {
|
public static boolean isOracleMode(String mode) {
|
||||||
return (mode != null && OB_COMPATIBLE_MODE_ORACLE.equals(mode.toString().toUpperCase()));
|
return (mode != null && OB_COMPATIBLE_MODE_ORACLE.equalsIgnoreCase(mode));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -1,15 +1,5 @@
|
|||||||
package com.alibaba.datax.plugin.writer.oceanbasev10writer;
|
package com.alibaba.datax.plugin.writer.oceanbasev10writer;
|
||||||
|
|
||||||
import java.sql.*;
|
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.List;
|
|
||||||
|
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils;
|
|
||||||
import com.alibaba.fastjson.JSONObject;
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||||
import com.alibaba.datax.common.spi.Writer;
|
import com.alibaba.datax.common.spi.Writer;
|
||||||
import com.alibaba.datax.common.util.Configuration;
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
@ -20,7 +10,16 @@ import com.alibaba.datax.plugin.rdbms.writer.Constant;
|
|||||||
import com.alibaba.datax.plugin.rdbms.writer.Key;
|
import com.alibaba.datax.plugin.rdbms.writer.Key;
|
||||||
import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
|
import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.task.ConcurrentTableWriterTask;
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.task.ConcurrentTableWriterTask;
|
||||||
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils;
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
|
||||||
|
import com.alibaba.fastjson.JSONObject;
|
||||||
|
import org.apache.commons.lang3.StringUtils;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.sql.Connection;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* 2016-04-07
|
* 2016-04-07
|
||||||
@ -61,17 +60,17 @@ public class OceanBaseV10Writer extends Writer {
|
|||||||
this.originalConfig = super.getPluginJobConf();
|
this.originalConfig = super.getPluginJobConf();
|
||||||
checkCompatibleMode(originalConfig);
|
checkCompatibleMode(originalConfig);
|
||||||
//将config中的column和table中的关键字进行转义
|
//将config中的column和table中的关键字进行转义
|
||||||
List<String> columns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.writer.Key.COLUMN, String.class);
|
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
|
||||||
ObWriterUtils.escapeDatabaseKeywords(columns);
|
ObWriterUtils.escapeDatabaseKeywords(columns);
|
||||||
originalConfig.set(com.alibaba.datax.plugin.rdbms.writer.Key.COLUMN, columns);
|
originalConfig.set(Key.COLUMN, columns);
|
||||||
|
|
||||||
List<JSONObject> conns = originalConfig.getList(com.alibaba.datax.plugin.rdbms.writer.Constant.CONN_MARK, JSONObject.class);
|
List<JSONObject> conns = originalConfig.getList(Constant.CONN_MARK, JSONObject.class);
|
||||||
for (int i = 0; i < conns.size(); i++) {
|
for (int i = 0; i < conns.size(); i++) {
|
||||||
JSONObject conn = conns.get(i);
|
JSONObject conn = conns.get(i);
|
||||||
Configuration connConfig = Configuration.from(conn.toString());
|
Configuration connConfig = Configuration.from(conn.toString());
|
||||||
List<String> tables = connConfig.getList(com.alibaba.datax.plugin.rdbms.writer.Key.TABLE, String.class);
|
List<String> tables = connConfig.getList(Key.TABLE, String.class);
|
||||||
ObWriterUtils.escapeDatabaseKeywords(tables);
|
ObWriterUtils.escapeDatabaseKeywords(tables);
|
||||||
originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.writer.Constant.CONN_MARK, i, com.alibaba.datax.plugin.rdbms.writer.Key.TABLE), tables);
|
originalConfig.set(String.format("%s[%d].%s", Constant.CONN_MARK, i, Key.TABLE), tables);
|
||||||
}
|
}
|
||||||
this.commonJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
|
this.commonJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
|
||||||
this.commonJob.init(this.originalConfig);
|
this.commonJob.init(this.originalConfig);
|
||||||
@ -235,6 +234,7 @@ public class OceanBaseV10Writer extends Writer {
|
|||||||
/**
|
/**
|
||||||
* 注意:此方法每个 Task 都会执行一次。 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。
|
* 注意:此方法每个 Task 都会执行一次。 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。
|
||||||
*/
|
*/
|
||||||
|
@Override
|
||||||
public void startWrite(RecordReceiver recordReceiver) {
|
public void startWrite(RecordReceiver recordReceiver) {
|
||||||
this.writerTask.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector());
|
this.writerTask.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector());
|
||||||
}
|
}
|
||||||
|
@ -1,7 +1,27 @@
|
|||||||
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
|
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.element.Column;
|
||||||
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
|
import com.alibaba.datax.common.plugin.RecordReceiver;
|
||||||
|
import com.alibaba.datax.common.plugin.TaskPluginCollector;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
||||||
|
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
||||||
|
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
||||||
|
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
|
||||||
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
|
||||||
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder;
|
||||||
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder;
|
||||||
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo;
|
||||||
|
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
|
||||||
|
import com.alipay.oceanbase.obproxy.data.TableEntryKey;
|
||||||
|
import com.alipay.oceanbase.obproxy.util.ObPartitionIdCalculator;
|
||||||
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
//import java.sql.PreparedStatement;
|
|
||||||
import java.sql.PreparedStatement;
|
import java.sql.PreparedStatement;
|
||||||
import java.sql.SQLException;
|
import java.sql.SQLException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
@ -15,29 +35,8 @@ import java.util.concurrent.atomic.AtomicLong;
|
|||||||
import java.util.concurrent.locks.Condition;
|
import java.util.concurrent.locks.Condition;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
import java.util.regex.Pattern;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Column;
|
//import java.sql.PreparedStatement;
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder;
|
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.element.Record;
|
|
||||||
import com.alibaba.datax.common.exception.DataXException;
|
|
||||||
import com.alibaba.datax.common.plugin.RecordReceiver;
|
|
||||||
import com.alibaba.datax.common.plugin.TaskPluginCollector;
|
|
||||||
import com.alibaba.datax.common.util.Configuration;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
|
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
|
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder;
|
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo;
|
|
||||||
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
|
|
||||||
import com.alipay.oceanbase.obproxy.data.TableEntryKey;
|
|
||||||
import com.alipay.oceanbase.obproxy.util.ObPartitionIdCalculator;
|
|
||||||
|
|
||||||
public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
|
public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentTableWriterTask.class);
|
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentTableWriterTask.class);
|
||||||
@ -105,14 +104,14 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
|
|||||||
connectInfo.getFullUserName(), connectInfo.password);
|
connectInfo.getFullUserName(), connectInfo.password);
|
||||||
checkConnHolder.initConnection();
|
checkConnHolder.initConnection();
|
||||||
if (isOracleCompatibleMode) {
|
if (isOracleCompatibleMode) {
|
||||||
connectInfo.databaseName = connectInfo.databaseName.toUpperCase();
|
connectInfo.databaseName = connectInfo.databaseName.toUpperCase();
|
||||||
//在转义的情况下不翻译
|
//在转义的情况下不翻译
|
||||||
if(!Pattern.matches("\"\\w*\"",table)){
|
if (table.startsWith("\"") && table.endsWith("\"")) {
|
||||||
table = table.toUpperCase();
|
table = table.toUpperCase();
|
||||||
}
|
}
|
||||||
|
|
||||||
LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s",
|
LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s",
|
||||||
connectInfo.databaseName, table));
|
connectInfo.databaseName, table));
|
||||||
}
|
}
|
||||||
|
|
||||||
if (config.getBool(Config.USE_PART_CALCULATOR, Config.DEFAULT_USE_PART_CALCULATOR)) {
|
if (config.getBool(Config.USE_PART_CALCULATOR, Config.DEFAULT_USE_PART_CALCULATOR)) {
|
||||||
|
@ -41,7 +41,6 @@ public class ObWriterUtils {
|
|||||||
if (databaseKeywords.contains(keyword.toUpperCase())) {
|
if (databaseKeywords.contains(keyword.toUpperCase())) {
|
||||||
keyword = escapeChar + keyword + escapeChar;
|
keyword = escapeChar + keyword + escapeChar;
|
||||||
}
|
}
|
||||||
keyword = keyword.toLowerCase();
|
|
||||||
keywords.set(i, keyword);
|
keywords.set(i, keyword);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user