Merge branch 'master' into feat/upgrade-databend

This commit is contained in:
Jeremy 2023-04-12 17:30:52 +08:00 committed by GitHub
commit 9f57abc736
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
28 changed files with 66 additions and 355 deletions

View File

@ -26,7 +26,7 @@ DataX本身作为数据同步框架将不同数据源的同步抽象为从源
# Quick Start
##### Download [DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz)
##### Download [DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202303/datax.tar.gz)
##### 请点击:[Quick Start](https://github.com/alibaba/DataX/blob/master/userGuid.md)
@ -70,6 +70,7 @@ DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、N
| | Databend | | √ | [](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) |
| | Hive | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| | kudu | | √ | [](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
| | selectdb | | √ | [](https://github.com/alibaba/DataX/blob/master/selectdbwriter/doc/selectdbwriter.md) |
| 无结构化数据存储 | TxtFile | √ | √ | [](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) |
| | FTP | √ | √ | [](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) |
| | HDFS | √ | √ | [](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) |
@ -108,6 +109,12 @@ DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、N
DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull requests月度更新内容会介绍介绍如下。
- [datax_v202303]https://github.com/alibaba/DataX/releases/tag/datax_v202303)
- 精简代码
- 新增插件adbmysqlwriter、databendwriter、selectdbwriter
- 优化插件、修复问题sqlserver、hdfs、cassandra、kudu、oss
- fastjson 升级到 fastjson2
- [datax_v202210]https://github.com/alibaba/DataX/releases/tag/datax_v202210)
- 涉及通道能力更新OceanBase、Tdengine、Doris等

View File

@ -110,7 +110,6 @@ DataX 将数据直连ADS接口利用ADS暴露的INSERT接口直写到ADS。
"account": "xxx@aliyun.com",
"odpsServer": "xxx",
"tunnelServer": "xxx",
"accountType": "aliyun",
"project": "transfer_project"
},
"writeMode": "load",

View File

@ -12,7 +12,6 @@ public class TransferProjectConf {
public final static String KEY_ACCOUNT = "odps.account";
public final static String KEY_ODPS_SERVER = "odps.odpsServer";
public final static String KEY_ODPS_TUNNEL = "odps.tunnelServer";
public final static String KEY_ACCOUNT_TYPE = "odps.accountType";
public final static String KEY_PROJECT = "odps.project";
private String accessId;
@ -20,7 +19,6 @@ public class TransferProjectConf {
private String account;
private String odpsServer;
private String odpsTunnel;
private String accountType;
private String project;
public static TransferProjectConf create(Configuration adsWriterConf) {
@ -30,7 +28,6 @@ public class TransferProjectConf {
res.account = adsWriterConf.getString(KEY_ACCOUNT);
res.odpsServer = adsWriterConf.getString(KEY_ODPS_SERVER);
res.odpsTunnel = adsWriterConf.getString(KEY_ODPS_TUNNEL);
res.accountType = adsWriterConf.getString(KEY_ACCOUNT_TYPE, "aliyun");
res.project = adsWriterConf.getString(KEY_PROJECT);
return res;
}
@ -55,9 +52,6 @@ public class TransferProjectConf {
return odpsTunnel;
}
public String getAccountType() {
return accountType;
}
public String getProject() {
return project;

View File

@ -31,7 +31,6 @@ public class PerfTrace {
private int taskGroupId;
private int channelNumber;
private int priority;
private int batchSize = 500;
private volatile boolean perfReportEnable = true;
@ -54,12 +53,12 @@ public class PerfTrace {
* @param taskGroupId
* @return
*/
public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) {
public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, boolean enable) {
if (instance == null) {
synchronized (lock) {
if (instance == null) {
instance = new PerfTrace(isJob, jobId, taskGroupId, priority, enable);
instance = new PerfTrace(isJob, jobId, taskGroupId, enable);
}
}
}
@ -76,22 +75,21 @@ public class PerfTrace {
LOG.error("PerfTrace instance not be init! must have some error! ");
synchronized (lock) {
if (instance == null) {
instance = new PerfTrace(false, -1111, -1111, 0, false);
instance = new PerfTrace(false, -1111, -1111, false);
}
}
}
return instance;
}
private PerfTrace(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) {
private PerfTrace(boolean isJob, long jobId, int taskGroupId, boolean enable) {
try {
this.perfTraceId = isJob ? "job_" + jobId : String.format("taskGroup_%s_%s", jobId, taskGroupId);
this.enable = enable;
this.isJob = isJob;
this.taskGroupId = taskGroupId;
this.instId = jobId;
this.priority = priority;
LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s, priority=%s", this.perfTraceId, this.enable, this.priority));
LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s", this.perfTraceId, this.enable));
} catch (Exception e) {
// do nothing
@ -398,7 +396,6 @@ public class PerfTrace {
jdo.setWindowEnd(this.windowEnd);
jdo.setJobStartTime(jobStartTime);
jdo.setJobRunTimeMs(System.currentTimeMillis() - jobStartTime.getTime());
jdo.setJobPriority(this.priority);
jdo.setChannelNum(this.channelNumber);
jdo.setCluster(this.cluster);
jdo.setJobDomain(this.jobDomain);
@ -609,7 +606,6 @@ public class PerfTrace {
private Date jobStartTime;
private Date jobEndTime;
private Long jobRunTimeMs;
private Integer jobPriority;
private Integer channelNum;
private String cluster;
private String jobDomain;
@ -680,10 +676,6 @@ public class PerfTrace {
return jobRunTimeMs;
}
public Integer getJobPriority() {
return jobPriority;
}
public Integer getChannelNum() {
return channelNum;
}
@ -816,10 +808,6 @@ public class PerfTrace {
this.jobRunTimeMs = jobRunTimeMs;
}
public void setJobPriority(Integer jobPriority) {
this.jobPriority = jobPriority;
}
public void setChannelNum(Integer channelNum) {
this.channelNum = channelNum;
}

View File

@ -1,62 +0,0 @@
package com.alibaba.datax.common.util;
import java.util.Map;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.alibaba.datax.common.exception.DataXException;
public class IdAndKeyRollingUtil {
private static Logger LOGGER = LoggerFactory.getLogger(IdAndKeyRollingUtil.class);
public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID";
public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY";
public final static String ACCESS_ID = "accessId";
public final static String ACCESS_KEY = "accessKey";
public static String parseAkFromSkynetAccessKey() {
Map<String, String> envProp = System.getenv();
String skynetAccessID = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSID);
String skynetAccessKey = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSKEY);
String accessKey = null;
// follow 原有的判断条件
// 环境变量中如果存在SKYNET_ACCESSID/SKYNET_ACCESSKEy只要有其中一个变量则认为一定是两个都存在的
// if (StringUtils.isNotBlank(skynetAccessID) ||
// StringUtils.isNotBlank(skynetAccessKey)) {
// 检查严格只有加密串不为空的时候才进去不过 之前能跑的加密串都不应该为空
if (StringUtils.isNotBlank(skynetAccessKey)) {
LOGGER.info("Try to get accessId/accessKey from environment SKYNET_ACCESSKEY.");
accessKey = DESCipher.decrypt(skynetAccessKey);
if (StringUtils.isBlank(accessKey)) {
// 环境变量里面有但是解析不到
throw DataXException.asDataXException(String.format(
"Failed to get the [accessId]/[accessKey] from the environment variable. The [accessId]=[%s]",
skynetAccessID));
}
}
if (StringUtils.isNotBlank(accessKey)) {
LOGGER.info("Get accessId/accessKey from environment variables SKYNET_ACCESSKEY successfully.");
}
return accessKey;
}
public static String getAccessIdAndKeyFromEnv(Configuration originalConfig) {
String accessId = null;
Map<String, String> envProp = System.getenv();
accessId = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSID);
String accessKey = null;
if (StringUtils.isBlank(accessKey)) {
// 老的没有出异常只是获取不到ak
accessKey = IdAndKeyRollingUtil.parseAkFromSkynetAccessKey();
}
if (StringUtils.isNotBlank(accessKey)) {
// 确认使用这个的都是 accessIdaccessKey的命名习惯
originalConfig.set(IdAndKeyRollingUtil.ACCESS_ID, accessId);
originalConfig.set(IdAndKeyRollingUtil.ACCESS_KEY, accessKey);
}
return accessKey;
}
}

View File

@ -79,16 +79,9 @@ public class Engine {
perfReportEnable = false;
}
int priority = 0;
try {
priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
}catch (NumberFormatException e){
LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
}
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
//初始化PerfTrace
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, traceEnable);
perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
container.start();

View File

@ -53,6 +53,8 @@ public class ElasticSearchClient {
public ElasticSearchClient(Configuration conf) {
this.conf = conf;
String endpoint = Key.getEndpoint(conf);
//es是支持集群写入的
String[] endpoints = endpoint.split(",");
String user = Key.getUsername(conf);
String passwd = Key.getPassword(conf);
boolean multiThread = Key.isMultiThread(conf);
@ -63,7 +65,7 @@ public class ElasticSearchClient {
int totalConnection = this.conf.getInt("maxTotalConnection", 200);
JestClientFactory factory = new JestClientFactory();
Builder httpClientConfig = new HttpClientConfig
.Builder(endpoint)
.Builder(Arrays.asList(endpoints))
// .setPreemptiveAuth(new HttpHost(endpoint))
.multiThreaded(multiThread)
.connTimeout(readTimeout)

View File

@ -24,7 +24,7 @@ FtpWriter实现了从DataX协议转为FTP文件功能FTP文件本身是无结
我们不能做到:
1. 单个文件不能支持并发写入。
1. 单个文件并发写入。
## 3 功能说明

View File

@ -331,9 +331,11 @@ public class DFSUtil {
//If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds
//Each file as a split
//TODO multy threads
InputSplit[] splits = in.getSplits(conf, 1);
RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL);
// OrcInputFormat getSplits params numSplits not used, splits size = block numbers
InputSplit[] splits = in.getSplits(conf, -1);
for (InputSplit split : splits) {
{
RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL);
Object key = reader.createKey();
Object value = reader.createValue();
// 获取列信息
@ -351,6 +353,8 @@ public class DFSUtil {
taskPluginCollector, isReadAllColumns, nullFormat);
}
reader.close();
}
}
} catch (Exception e) {
String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。"
, sourceOrcFilePath);

View File

@ -134,7 +134,7 @@ public class KuduWriterTask {
break;
case BOOLEAN:
synchronized (lock) {
row.addBoolean(name, Boolean.getBoolean(rawData));
row.addBoolean(name, Boolean.parseBoolean(rawData));
}
break;
case STRING:

View File

@ -114,8 +114,7 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据通过主控的J
"accessKey": "********************",
"truncate": true,
"odpsServer": "xxx/api",
"tunnelServer": "xxx",
"accountType": "aliyun"
"tunnelServer": "xxx"
}
}
}

View File

@ -14,20 +14,9 @@ public class Constant {
public static final String PARTITION_SPLIT_MODE = "partition";
public static final String DEFAULT_ACCOUNT_TYPE = "aliyun";
public static final String TAOBAO_ACCOUNT_TYPE = "taobao";
// 常量字段用COLUMN_CONSTANT_FLAG 首尾包住即可
public final static String COLUMN_CONSTANT_FLAG = "'";
/**
* 以下是获取accesskey id 需要用到的常量值
*/
public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID";
public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY";
public static final String PARTITION_COLUMNS = "partitionColumns";
public static final String PARSED_COLUMNS = "parsedColumns";

View File

@ -24,9 +24,6 @@ public class Key {
// 当值为partition 则只切分到分区当值为record则当按照分区切分后达不到adviceNum时继续按照record切分
public final static String SPLIT_MODE = "splitMode";
// 账号类型默认为aliyun也可能为taobao等其他类型
public final static String ACCOUNT_TYPE = "accountType";
public final static String PACKAGE_AUTHORIZED_PROJECT = "packageAuthorizedProject";
public final static String IS_COMPRESS = "isCompress";

View File

@ -42,12 +42,6 @@ public class OdpsReader extends Reader {
this.originalConfig = super.getPluginJobConf();
this.successOnNoPartition = this.originalConfig.getBool(Key.SUCCESS_ON_NO_PATITION, false);
//如果用户没有配置accessId/accessKey,尝试从环境变量获取
String accountType = originalConfig.getString(Key.ACCOUNT_TYPE, Constant.DEFAULT_ACCOUNT_TYPE);
if (Constant.DEFAULT_ACCOUNT_TYPE.equalsIgnoreCase(accountType)) {
this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig);
}
//检查必要的参数配置
OdpsUtil.checkNecessaryConfig(this.originalConfig);
//重试次数的配置检查

View File

@ -1,65 +0,0 @@
/**
* (C) 2010-2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.datax.plugin.reader.odpsreader.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.IdAndKeyRollingUtil;
import com.alibaba.datax.common.util.MessageSource;
import com.alibaba.datax.plugin.reader.odpsreader.Key;
import com.alibaba.datax.plugin.reader.odpsreader.OdpsReaderErrorCode;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class IdAndKeyUtil {
private static Logger LOG = LoggerFactory.getLogger(IdAndKeyUtil.class);
private static MessageSource MESSAGE_SOURCE = MessageSource.loadResourceBundle(IdAndKeyUtil.class);
public static Configuration parseAccessIdAndKey(Configuration originalConfig) {
String accessId = originalConfig.getString(Key.ACCESS_ID);
String accessKey = originalConfig.getString(Key.ACCESS_KEY);
// 只要 accessId,accessKey 二者配置了一个就理解为是用户本意是要直接手动配置其 accessid/accessKey
if (StringUtils.isNotBlank(accessId) || StringUtils.isNotBlank(accessKey)) {
LOG.info("Try to get accessId/accessKey from your config.");
//通过如下语句进行检查是否确实配置了
accessId = originalConfig.getNecessaryValue(Key.ACCESS_ID, OdpsReaderErrorCode.REQUIRED_VALUE);
accessKey = originalConfig.getNecessaryValue(Key.ACCESS_KEY, OdpsReaderErrorCode.REQUIRED_VALUE);
//检查完毕返回即可
return originalConfig;
} else {
Map<String, String> envProp = System.getenv();
return getAccessIdAndKeyFromEnv(originalConfig, envProp);
}
}
private static Configuration getAccessIdAndKeyFromEnv(Configuration originalConfig,
Map<String, String> envProp) {
// 如果获取到ak在getAccessIdAndKeyFromEnv中已经设置到originalConfig了
String accessKey = IdAndKeyRollingUtil.getAccessIdAndKeyFromEnv(originalConfig);
if (StringUtils.isBlank(accessKey)) {
// 无处获取既没有配置在作业中也没用在环境变量中
throw DataXException.asDataXException(OdpsReaderErrorCode.GET_ID_KEY_FAIL,
MESSAGE_SOURCE.message("idandkeyutil.2"));
}
return originalConfig;
}
}

View File

@ -76,20 +76,13 @@ public final class OdpsUtil {
defaultProject = packageAuthorizedProject;
}
String accountType = originalConfig.getString(Key.ACCOUNT_TYPE,
Constant.DEFAULT_ACCOUNT_TYPE);
Account account = null;
if (accountType.equalsIgnoreCase(Constant.DEFAULT_ACCOUNT_TYPE)) {
if (StringUtils.isNotBlank(securityToken)) {
account = new StsAccount(accessId, accessKey, securityToken);
} else {
account = new AliyunAccount(accessId, accessKey);
}
} else {
throw DataXException.asDataXException(OdpsReaderErrorCode.ACCOUNT_TYPE_ERROR,
MESSAGE_SOURCE.message("odpsutil.3", accountType));
}
Odps odps = new Odps(account);
boolean isPreCheck = originalConfig.getBool("dryRun", false);

View File

@ -71,8 +71,7 @@ ODPSWriter插件用于实现往ODPS插入或者更新数据主要提供给etl
"accessKey": "xxxx",
"truncate": true,
"odpsServer": "http://sxxx/api",
"tunnelServer": "http://xxx",
"accountType": "aliyun"
"tunnelServer": "http://xxx"
}
}
}

View File

@ -2,13 +2,6 @@ package com.alibaba.datax.plugin.writer.odpswriter;
public class Constant {
public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID";
public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY";
public static final String DEFAULT_ACCOUNT_TYPE = "aliyun";
public static final String TAOBAO_ACCOUNT_TYPE = "taobao";
public static final String COLUMN_POSITION = "columnPosition";

View File

@ -30,8 +30,6 @@ public final class Key {
//boolean 类型default:false
public final static String EMPTY_AS_NULL = "emptyAsNull";
public final static String ACCOUNT_TYPE = "accountType";
public final static String IS_COMPRESS = "isCompress";
// preSql

View File

@ -62,7 +62,6 @@ public class OdpsWriter extends Writer {
private String tableName;
private String tunnelServer;
private String partition;
private String accountType;
private boolean truncate;
private String uploadId;
private TableTunnel.UploadSession masterUpload;
@ -104,8 +103,6 @@ public class OdpsWriter extends Writer {
this.tableName = this.originalConfig.getString(Key.TABLE);
this.tunnelServer = this.originalConfig.getString(Key.TUNNEL_SERVER, null);
this.dealAK();
// init odps config
this.odps = OdpsUtil.initOdpsProject(this.originalConfig);
@ -153,31 +150,6 @@ public class OdpsWriter extends Writer {
}
}
private void dealAK() {
this.accountType = this.originalConfig.getString(Key.ACCOUNT_TYPE,
Constant.DEFAULT_ACCOUNT_TYPE);
if (!Constant.DEFAULT_ACCOUNT_TYPE.equalsIgnoreCase(this.accountType) &&
!Constant.TAOBAO_ACCOUNT_TYPE.equalsIgnoreCase(this.accountType)) {
throw DataXException.asDataXException(OdpsWriterErrorCode.ACCOUNT_TYPE_ERROR,
MESSAGE_SOURCE.message("odpswriter.1", accountType));
}
this.originalConfig.set(Key.ACCOUNT_TYPE, this.accountType);
//检查accessId,accessKey配置
if (Constant.DEFAULT_ACCOUNT_TYPE
.equalsIgnoreCase(this.accountType)) {
this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig);
String accessId = this.originalConfig.getString(Key.ACCESS_ID);
String accessKey = this.originalConfig.getString(Key.ACCESS_KEY);
if (IS_DEBUG) {
LOG.debug("accessId:[{}], accessKey:[{}] .", accessId,
accessKey);
}
LOG.info("accessId:[{}] .", accessId);
}
}
private void dealDynamicPartition() {
/*
* 如果显示配置了 supportDynamicPartition则以配置为准
@ -241,20 +213,6 @@ public class OdpsWriter extends Writer {
@Override
public void prepare() {
String accessId = null;
String accessKey = null;
if (Constant.DEFAULT_ACCOUNT_TYPE
.equalsIgnoreCase(this.accountType)) {
this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig);
accessId = this.originalConfig.getString(Key.ACCESS_ID);
accessKey = this.originalConfig.getString(Key.ACCESS_KEY);
if (IS_DEBUG) {
LOG.debug("accessId:[{}], accessKey:[{}] .", accessId,
accessKey);
}
LOG.info("accessId:[{}] .", accessId);
}
// init odps config
this.odps = OdpsUtil.initOdpsProject(this.originalConfig);

View File

@ -1,65 +0,0 @@
/**
* (C) 2010-2022 Alibaba Group Holding Limited.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.alibaba.datax.plugin.writer.odpswriter.util;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.IdAndKeyRollingUtil;
import com.alibaba.datax.common.util.MessageSource;
import com.alibaba.datax.plugin.writer.odpswriter.Key;
import com.alibaba.datax.plugin.writer.odpswriter.OdpsWriterErrorCode;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Map;
public class IdAndKeyUtil {
private static Logger LOG = LoggerFactory.getLogger(IdAndKeyUtil.class);
private static final MessageSource MESSAGE_SOURCE = MessageSource.loadResourceBundle(IdAndKeyUtil.class);
public static Configuration parseAccessIdAndKey(Configuration originalConfig) {
String accessId = originalConfig.getString(Key.ACCESS_ID);
String accessKey = originalConfig.getString(Key.ACCESS_KEY);
// 只要 accessId,accessKey 二者配置了一个就理解为是用户本意是要直接手动配置其 accessid/accessKey
if (StringUtils.isNotBlank(accessId) || StringUtils.isNotBlank(accessKey)) {
LOG.info("Try to get accessId/accessKey from your config.");
//通过如下语句进行检查是否确实配置了
accessId = originalConfig.getNecessaryValue(Key.ACCESS_ID, OdpsWriterErrorCode.REQUIRED_VALUE);
accessKey = originalConfig.getNecessaryValue(Key.ACCESS_KEY, OdpsWriterErrorCode.REQUIRED_VALUE);
//检查完毕返回即可
return originalConfig;
} else {
Map<String, String> envProp = System.getenv();
return getAccessIdAndKeyFromEnv(originalConfig, envProp);
}
}
private static Configuration getAccessIdAndKeyFromEnv(Configuration originalConfig,
Map<String, String> envProp) {
// 如果获取到ak在getAccessIdAndKeyFromEnv中已经设置到originalConfig了
String accessKey = IdAndKeyRollingUtil.getAccessIdAndKeyFromEnv(originalConfig);
if (StringUtils.isBlank(accessKey)) {
// 无处获取既没有配置在作业中也没用在环境变量中
throw DataXException.asDataXException(OdpsWriterErrorCode.GET_ID_KEY_FAIL,
MESSAGE_SOURCE.message("idandkeyutil.2"));
}
return originalConfig;
}
}

View File

@ -79,7 +79,6 @@ public class OdpsUtil {
public static Odps initOdpsProject(Configuration originalConfig) {
String accountType = originalConfig.getString(Key.ACCOUNT_TYPE);
String accessId = originalConfig.getString(Key.ACCESS_ID);
String accessKey = originalConfig.getString(Key.ACCESS_KEY);
@ -88,16 +87,11 @@ public class OdpsUtil {
String securityToken = originalConfig.getString(Key.SECURITY_TOKEN);
Account account;
if (accountType.equalsIgnoreCase(Constant.DEFAULT_ACCOUNT_TYPE)) {
if (StringUtils.isNotBlank(securityToken)) {
account = new com.aliyun.odps.account.StsAccount(accessId, accessKey, securityToken);
} else {
account = new AliyunAccount(accessId, accessKey);
}
} else {
throw DataXException.asDataXException(OdpsWriterErrorCode.ACCOUNT_TYPE_ERROR,
MESSAGE_SOURCE.message("odpsutil.4", accountType));
}
Odps odps = new Odps(account);
boolean isPreCheck = originalConfig.getBool("dryRun", false);

View File

@ -10,6 +10,7 @@ import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.Connection;
import java.util.ArrayList;
import java.util.List;
@ -120,9 +121,15 @@ public final class OriginalConfPretreatmentUtil {
} else {
// 确保用户配置的 column 不重复
ListUtil.makeSureNoValueDuplicate(userConfiguredColumns, false);
Connection connection = null;
try {
connection = connectionFactory.getConnecttion();
// 检查列是否都为数据库表中正确的列通过执行一次 select column from table 进行判断
DBUtil.getColumnMetaData(connectionFactory.getConnecttion(), oneTable,StringUtils.join(userConfiguredColumns, ","));
DBUtil.getColumnMetaData(connection, oneTable,StringUtils.join(userConfiguredColumns, ","));
} finally {
DBUtil.closeDBResources(null, connection);
}
}
}
}

View File

@ -22,7 +22,7 @@
<commons-lang3-version>3.3.2</commons-lang3-version>
<commons-configuration-version>1.10</commons-configuration-version>
<commons-cli-version>1.2</commons-cli-version>
<fastjson-version>2.0.19</fastjson-version>
<fastjson-version>2.0.23</fastjson-version>
<guava-version>16.0.1</guava-version>
<diamond.version>3.7.2.1-SNAPSHOT</diamond.version>

View File

@ -64,7 +64,6 @@
<dependency>
<groupId>com.alibaba.fastjson2</groupId>
<artifactId>fastjson2</artifactId>
<version>2.0.23</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
@ -98,10 +97,6 @@
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<relocations>
<relocation>
<pattern>com.alibaba.fastjson</pattern>
<shadedPattern>com.starrocks.shade.com.alibaba.fastjson</shadedPattern>
</relocation>
<relocation>
<pattern>org.apache.http</pattern>
<shadedPattern>com.starrocks.shade.org.apache.http</shadedPattern>
@ -118,7 +113,6 @@
<include>commons-logging:*</include>
<include>org.apache.httpcomponents:httpclient</include>
<include>org.apache.httpcomponents:httpcore</include>
<include>com.alibaba:fastjson</include>
</includes>
</artifactSet>
<filters>

View File

@ -182,6 +182,7 @@ public class TxtFileReader extends Reader {
delimiterInStr));
}
UnstructuredStorageReaderUtil.validateCsvReaderConfig(this.originConfig);
}
@Override