mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 01:30:50 +08:00
datax 202302
This commit is contained in:
parent
f01836a080
commit
f79f742d7c
@ -110,7 +110,6 @@ DataX 将数据直连ADS接口,利用ADS暴露的INSERT接口直写到ADS。
|
|||||||
"account": "xxx@aliyun.com",
|
"account": "xxx@aliyun.com",
|
||||||
"odpsServer": "xxx",
|
"odpsServer": "xxx",
|
||||||
"tunnelServer": "xxx",
|
"tunnelServer": "xxx",
|
||||||
"accountType": "aliyun",
|
|
||||||
"project": "transfer_project"
|
"project": "transfer_project"
|
||||||
},
|
},
|
||||||
"writeMode": "load",
|
"writeMode": "load",
|
||||||
|
@ -12,7 +12,6 @@ public class TransferProjectConf {
|
|||||||
public final static String KEY_ACCOUNT = "odps.account";
|
public final static String KEY_ACCOUNT = "odps.account";
|
||||||
public final static String KEY_ODPS_SERVER = "odps.odpsServer";
|
public final static String KEY_ODPS_SERVER = "odps.odpsServer";
|
||||||
public final static String KEY_ODPS_TUNNEL = "odps.tunnelServer";
|
public final static String KEY_ODPS_TUNNEL = "odps.tunnelServer";
|
||||||
public final static String KEY_ACCOUNT_TYPE = "odps.accountType";
|
|
||||||
public final static String KEY_PROJECT = "odps.project";
|
public final static String KEY_PROJECT = "odps.project";
|
||||||
|
|
||||||
private String accessId;
|
private String accessId;
|
||||||
@ -20,7 +19,6 @@ public class TransferProjectConf {
|
|||||||
private String account;
|
private String account;
|
||||||
private String odpsServer;
|
private String odpsServer;
|
||||||
private String odpsTunnel;
|
private String odpsTunnel;
|
||||||
private String accountType;
|
|
||||||
private String project;
|
private String project;
|
||||||
|
|
||||||
public static TransferProjectConf create(Configuration adsWriterConf) {
|
public static TransferProjectConf create(Configuration adsWriterConf) {
|
||||||
@ -30,7 +28,6 @@ public class TransferProjectConf {
|
|||||||
res.account = adsWriterConf.getString(KEY_ACCOUNT);
|
res.account = adsWriterConf.getString(KEY_ACCOUNT);
|
||||||
res.odpsServer = adsWriterConf.getString(KEY_ODPS_SERVER);
|
res.odpsServer = adsWriterConf.getString(KEY_ODPS_SERVER);
|
||||||
res.odpsTunnel = adsWriterConf.getString(KEY_ODPS_TUNNEL);
|
res.odpsTunnel = adsWriterConf.getString(KEY_ODPS_TUNNEL);
|
||||||
res.accountType = adsWriterConf.getString(KEY_ACCOUNT_TYPE, "aliyun");
|
|
||||||
res.project = adsWriterConf.getString(KEY_PROJECT);
|
res.project = adsWriterConf.getString(KEY_PROJECT);
|
||||||
return res;
|
return res;
|
||||||
}
|
}
|
||||||
@ -55,9 +52,6 @@ public class TransferProjectConf {
|
|||||||
return odpsTunnel;
|
return odpsTunnel;
|
||||||
}
|
}
|
||||||
|
|
||||||
public String getAccountType() {
|
|
||||||
return accountType;
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getProject() {
|
public String getProject() {
|
||||||
return project;
|
return project;
|
||||||
|
@ -31,7 +31,6 @@ public class PerfTrace {
|
|||||||
private int taskGroupId;
|
private int taskGroupId;
|
||||||
private int channelNumber;
|
private int channelNumber;
|
||||||
|
|
||||||
private int priority;
|
|
||||||
private int batchSize = 500;
|
private int batchSize = 500;
|
||||||
private volatile boolean perfReportEnable = true;
|
private volatile boolean perfReportEnable = true;
|
||||||
|
|
||||||
@ -54,12 +53,12 @@ public class PerfTrace {
|
|||||||
* @param taskGroupId
|
* @param taskGroupId
|
||||||
* @return
|
* @return
|
||||||
*/
|
*/
|
||||||
public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) {
|
public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, boolean enable) {
|
||||||
|
|
||||||
if (instance == null) {
|
if (instance == null) {
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
if (instance == null) {
|
if (instance == null) {
|
||||||
instance = new PerfTrace(isJob, jobId, taskGroupId, priority, enable);
|
instance = new PerfTrace(isJob, jobId, taskGroupId, enable);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -76,22 +75,21 @@ public class PerfTrace {
|
|||||||
LOG.error("PerfTrace instance not be init! must have some error! ");
|
LOG.error("PerfTrace instance not be init! must have some error! ");
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
if (instance == null) {
|
if (instance == null) {
|
||||||
instance = new PerfTrace(false, -1111, -1111, 0, false);
|
instance = new PerfTrace(false, -1111, -1111, false);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return instance;
|
return instance;
|
||||||
}
|
}
|
||||||
|
|
||||||
private PerfTrace(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) {
|
private PerfTrace(boolean isJob, long jobId, int taskGroupId, boolean enable) {
|
||||||
try {
|
try {
|
||||||
this.perfTraceId = isJob ? "job_" + jobId : String.format("taskGroup_%s_%s", jobId, taskGroupId);
|
this.perfTraceId = isJob ? "job_" + jobId : String.format("taskGroup_%s_%s", jobId, taskGroupId);
|
||||||
this.enable = enable;
|
this.enable = enable;
|
||||||
this.isJob = isJob;
|
this.isJob = isJob;
|
||||||
this.taskGroupId = taskGroupId;
|
this.taskGroupId = taskGroupId;
|
||||||
this.instId = jobId;
|
this.instId = jobId;
|
||||||
this.priority = priority;
|
LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s", this.perfTraceId, this.enable));
|
||||||
LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s, priority=%s", this.perfTraceId, this.enable, this.priority));
|
|
||||||
|
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
// do nothing
|
// do nothing
|
||||||
@ -398,7 +396,6 @@ public class PerfTrace {
|
|||||||
jdo.setWindowEnd(this.windowEnd);
|
jdo.setWindowEnd(this.windowEnd);
|
||||||
jdo.setJobStartTime(jobStartTime);
|
jdo.setJobStartTime(jobStartTime);
|
||||||
jdo.setJobRunTimeMs(System.currentTimeMillis() - jobStartTime.getTime());
|
jdo.setJobRunTimeMs(System.currentTimeMillis() - jobStartTime.getTime());
|
||||||
jdo.setJobPriority(this.priority);
|
|
||||||
jdo.setChannelNum(this.channelNumber);
|
jdo.setChannelNum(this.channelNumber);
|
||||||
jdo.setCluster(this.cluster);
|
jdo.setCluster(this.cluster);
|
||||||
jdo.setJobDomain(this.jobDomain);
|
jdo.setJobDomain(this.jobDomain);
|
||||||
@ -609,7 +606,6 @@ public class PerfTrace {
|
|||||||
private Date jobStartTime;
|
private Date jobStartTime;
|
||||||
private Date jobEndTime;
|
private Date jobEndTime;
|
||||||
private Long jobRunTimeMs;
|
private Long jobRunTimeMs;
|
||||||
private Integer jobPriority;
|
|
||||||
private Integer channelNum;
|
private Integer channelNum;
|
||||||
private String cluster;
|
private String cluster;
|
||||||
private String jobDomain;
|
private String jobDomain;
|
||||||
@ -680,10 +676,6 @@ public class PerfTrace {
|
|||||||
return jobRunTimeMs;
|
return jobRunTimeMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getJobPriority() {
|
|
||||||
return jobPriority;
|
|
||||||
}
|
|
||||||
|
|
||||||
public Integer getChannelNum() {
|
public Integer getChannelNum() {
|
||||||
return channelNum;
|
return channelNum;
|
||||||
}
|
}
|
||||||
@ -816,10 +808,6 @@ public class PerfTrace {
|
|||||||
this.jobRunTimeMs = jobRunTimeMs;
|
this.jobRunTimeMs = jobRunTimeMs;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setJobPriority(Integer jobPriority) {
|
|
||||||
this.jobPriority = jobPriority;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setChannelNum(Integer channelNum) {
|
public void setChannelNum(Integer channelNum) {
|
||||||
this.channelNum = channelNum;
|
this.channelNum = channelNum;
|
||||||
}
|
}
|
||||||
|
@ -1,62 +0,0 @@
|
|||||||
package com.alibaba.datax.common.util;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.exception.DataXException;
|
|
||||||
|
|
||||||
public class IdAndKeyRollingUtil {
|
|
||||||
private static Logger LOGGER = LoggerFactory.getLogger(IdAndKeyRollingUtil.class);
|
|
||||||
public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID";
|
|
||||||
public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY";
|
|
||||||
|
|
||||||
public final static String ACCESS_ID = "accessId";
|
|
||||||
public final static String ACCESS_KEY = "accessKey";
|
|
||||||
|
|
||||||
public static String parseAkFromSkynetAccessKey() {
|
|
||||||
Map<String, String> envProp = System.getenv();
|
|
||||||
String skynetAccessID = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSID);
|
|
||||||
String skynetAccessKey = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSKEY);
|
|
||||||
String accessKey = null;
|
|
||||||
// follow 原有的判断条件
|
|
||||||
// 环境变量中,如果存在SKYNET_ACCESSID/SKYNET_ACCESSKEy(只要有其中一个变量,则认为一定是两个都存在的!
|
|
||||||
// if (StringUtils.isNotBlank(skynetAccessID) ||
|
|
||||||
// StringUtils.isNotBlank(skynetAccessKey)) {
|
|
||||||
// 检查严格,只有加密串不为空的时候才进去,不过 之前能跑的加密串都不应该为空
|
|
||||||
if (StringUtils.isNotBlank(skynetAccessKey)) {
|
|
||||||
LOGGER.info("Try to get accessId/accessKey from environment SKYNET_ACCESSKEY.");
|
|
||||||
accessKey = DESCipher.decrypt(skynetAccessKey);
|
|
||||||
if (StringUtils.isBlank(accessKey)) {
|
|
||||||
// 环境变量里面有,但是解析不到
|
|
||||||
throw DataXException.asDataXException(String.format(
|
|
||||||
"Failed to get the [accessId]/[accessKey] from the environment variable. The [accessId]=[%s]",
|
|
||||||
skynetAccessID));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if (StringUtils.isNotBlank(accessKey)) {
|
|
||||||
LOGGER.info("Get accessId/accessKey from environment variables SKYNET_ACCESSKEY successfully.");
|
|
||||||
}
|
|
||||||
return accessKey;
|
|
||||||
}
|
|
||||||
|
|
||||||
public static String getAccessIdAndKeyFromEnv(Configuration originalConfig) {
|
|
||||||
String accessId = null;
|
|
||||||
Map<String, String> envProp = System.getenv();
|
|
||||||
accessId = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSID);
|
|
||||||
String accessKey = null;
|
|
||||||
if (StringUtils.isBlank(accessKey)) {
|
|
||||||
// 老的没有出异常,只是获取不到ak
|
|
||||||
accessKey = IdAndKeyRollingUtil.parseAkFromSkynetAccessKey();
|
|
||||||
}
|
|
||||||
|
|
||||||
if (StringUtils.isNotBlank(accessKey)) {
|
|
||||||
// 确认使用这个的都是 accessId、accessKey的命名习惯
|
|
||||||
originalConfig.set(IdAndKeyRollingUtil.ACCESS_ID, accessId);
|
|
||||||
originalConfig.set(IdAndKeyRollingUtil.ACCESS_KEY, accessKey);
|
|
||||||
}
|
|
||||||
return accessKey;
|
|
||||||
}
|
|
||||||
}
|
|
@ -79,16 +79,9 @@ public class Engine {
|
|||||||
perfReportEnable = false;
|
perfReportEnable = false;
|
||||||
}
|
}
|
||||||
|
|
||||||
int priority = 0;
|
|
||||||
try {
|
|
||||||
priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY"));
|
|
||||||
}catch (NumberFormatException e){
|
|
||||||
LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY"));
|
|
||||||
}
|
|
||||||
|
|
||||||
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
|
Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO);
|
||||||
//初始化PerfTrace
|
//初始化PerfTrace
|
||||||
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable);
|
PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, traceEnable);
|
||||||
perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
|
perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber);
|
||||||
container.start();
|
container.start();
|
||||||
|
|
||||||
|
@ -134,7 +134,7 @@ public class KuduWriterTask {
|
|||||||
break;
|
break;
|
||||||
case BOOLEAN:
|
case BOOLEAN:
|
||||||
synchronized (lock) {
|
synchronized (lock) {
|
||||||
row.addBoolean(name, Boolean.getBoolean(rawData));
|
row.addBoolean(name, Boolean.parseBoolean(rawData));
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case STRING:
|
case STRING:
|
||||||
|
@ -114,8 +114,7 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的J
|
|||||||
"accessKey": "********************",
|
"accessKey": "********************",
|
||||||
"truncate": true,
|
"truncate": true,
|
||||||
"odpsServer": "xxx/api",
|
"odpsServer": "xxx/api",
|
||||||
"tunnelServer": "xxx",
|
"tunnelServer": "xxx"
|
||||||
"accountType": "aliyun"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -14,20 +14,9 @@ public class Constant {
|
|||||||
|
|
||||||
public static final String PARTITION_SPLIT_MODE = "partition";
|
public static final String PARTITION_SPLIT_MODE = "partition";
|
||||||
|
|
||||||
public static final String DEFAULT_ACCOUNT_TYPE = "aliyun";
|
|
||||||
|
|
||||||
public static final String TAOBAO_ACCOUNT_TYPE = "taobao";
|
|
||||||
|
|
||||||
// 常量字段用COLUMN_CONSTANT_FLAG 首尾包住即可
|
// 常量字段用COLUMN_CONSTANT_FLAG 首尾包住即可
|
||||||
public final static String COLUMN_CONSTANT_FLAG = "'";
|
public final static String COLUMN_CONSTANT_FLAG = "'";
|
||||||
|
|
||||||
/**
|
|
||||||
* 以下是获取accesskey id 需要用到的常量值
|
|
||||||
*/
|
|
||||||
public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID";
|
|
||||||
|
|
||||||
public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY";
|
|
||||||
|
|
||||||
public static final String PARTITION_COLUMNS = "partitionColumns";
|
public static final String PARTITION_COLUMNS = "partitionColumns";
|
||||||
|
|
||||||
public static final String PARSED_COLUMNS = "parsedColumns";
|
public static final String PARSED_COLUMNS = "parsedColumns";
|
||||||
|
@ -24,9 +24,6 @@ public class Key {
|
|||||||
// 当值为:partition 则只切分到分区;当值为:record,则当按照分区切分后达不到adviceNum时,继续按照record切分
|
// 当值为:partition 则只切分到分区;当值为:record,则当按照分区切分后达不到adviceNum时,继续按照record切分
|
||||||
public final static String SPLIT_MODE = "splitMode";
|
public final static String SPLIT_MODE = "splitMode";
|
||||||
|
|
||||||
// 账号类型,默认为aliyun,也可能为taobao等其他类型
|
|
||||||
public final static String ACCOUNT_TYPE = "accountType";
|
|
||||||
|
|
||||||
public final static String PACKAGE_AUTHORIZED_PROJECT = "packageAuthorizedProject";
|
public final static String PACKAGE_AUTHORIZED_PROJECT = "packageAuthorizedProject";
|
||||||
|
|
||||||
public final static String IS_COMPRESS = "isCompress";
|
public final static String IS_COMPRESS = "isCompress";
|
||||||
|
@ -42,12 +42,6 @@ public class OdpsReader extends Reader {
|
|||||||
this.originalConfig = super.getPluginJobConf();
|
this.originalConfig = super.getPluginJobConf();
|
||||||
this.successOnNoPartition = this.originalConfig.getBool(Key.SUCCESS_ON_NO_PATITION, false);
|
this.successOnNoPartition = this.originalConfig.getBool(Key.SUCCESS_ON_NO_PATITION, false);
|
||||||
|
|
||||||
//如果用户没有配置accessId/accessKey,尝试从环境变量获取
|
|
||||||
String accountType = originalConfig.getString(Key.ACCOUNT_TYPE, Constant.DEFAULT_ACCOUNT_TYPE);
|
|
||||||
if (Constant.DEFAULT_ACCOUNT_TYPE.equalsIgnoreCase(accountType)) {
|
|
||||||
this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig);
|
|
||||||
}
|
|
||||||
|
|
||||||
//检查必要的参数配置
|
//检查必要的参数配置
|
||||||
OdpsUtil.checkNecessaryConfig(this.originalConfig);
|
OdpsUtil.checkNecessaryConfig(this.originalConfig);
|
||||||
//重试次数的配置检查
|
//重试次数的配置检查
|
||||||
|
@ -1,65 +0,0 @@
|
|||||||
/**
|
|
||||||
* (C) 2010-2022 Alibaba Group Holding Limited.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package com.alibaba.datax.plugin.reader.odpsreader.util;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.exception.DataXException;
|
|
||||||
import com.alibaba.datax.common.util.Configuration;
|
|
||||||
import com.alibaba.datax.common.util.IdAndKeyRollingUtil;
|
|
||||||
import com.alibaba.datax.common.util.MessageSource;
|
|
||||||
import com.alibaba.datax.plugin.reader.odpsreader.Key;
|
|
||||||
import com.alibaba.datax.plugin.reader.odpsreader.OdpsReaderErrorCode;
|
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public class IdAndKeyUtil {
|
|
||||||
private static Logger LOG = LoggerFactory.getLogger(IdAndKeyUtil.class);
|
|
||||||
private static MessageSource MESSAGE_SOURCE = MessageSource.loadResourceBundle(IdAndKeyUtil.class);
|
|
||||||
|
|
||||||
public static Configuration parseAccessIdAndKey(Configuration originalConfig) {
|
|
||||||
String accessId = originalConfig.getString(Key.ACCESS_ID);
|
|
||||||
String accessKey = originalConfig.getString(Key.ACCESS_KEY);
|
|
||||||
|
|
||||||
// 只要 accessId,accessKey 二者配置了一个,就理解为是用户本意是要直接手动配置其 accessid/accessKey
|
|
||||||
if (StringUtils.isNotBlank(accessId) || StringUtils.isNotBlank(accessKey)) {
|
|
||||||
LOG.info("Try to get accessId/accessKey from your config.");
|
|
||||||
//通过如下语句,进行检查是否确实配置了
|
|
||||||
accessId = originalConfig.getNecessaryValue(Key.ACCESS_ID, OdpsReaderErrorCode.REQUIRED_VALUE);
|
|
||||||
accessKey = originalConfig.getNecessaryValue(Key.ACCESS_KEY, OdpsReaderErrorCode.REQUIRED_VALUE);
|
|
||||||
//检查完毕,返回即可
|
|
||||||
return originalConfig;
|
|
||||||
} else {
|
|
||||||
Map<String, String> envProp = System.getenv();
|
|
||||||
return getAccessIdAndKeyFromEnv(originalConfig, envProp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Configuration getAccessIdAndKeyFromEnv(Configuration originalConfig,
|
|
||||||
Map<String, String> envProp) {
|
|
||||||
// 如果获取到ak,在getAccessIdAndKeyFromEnv中已经设置到originalConfig了
|
|
||||||
String accessKey = IdAndKeyRollingUtil.getAccessIdAndKeyFromEnv(originalConfig);
|
|
||||||
if (StringUtils.isBlank(accessKey)) {
|
|
||||||
// 无处获取(既没有配置在作业中,也没用在环境变量中)
|
|
||||||
throw DataXException.asDataXException(OdpsReaderErrorCode.GET_ID_KEY_FAIL,
|
|
||||||
MESSAGE_SOURCE.message("idandkeyutil.2"));
|
|
||||||
}
|
|
||||||
return originalConfig;
|
|
||||||
}
|
|
||||||
}
|
|
@ -76,19 +76,12 @@ public final class OdpsUtil {
|
|||||||
defaultProject = packageAuthorizedProject;
|
defaultProject = packageAuthorizedProject;
|
||||||
}
|
}
|
||||||
|
|
||||||
String accountType = originalConfig.getString(Key.ACCOUNT_TYPE,
|
|
||||||
Constant.DEFAULT_ACCOUNT_TYPE);
|
|
||||||
|
|
||||||
Account account = null;
|
Account account = null;
|
||||||
if (accountType.equalsIgnoreCase(Constant.DEFAULT_ACCOUNT_TYPE)) {
|
if (StringUtils.isNotBlank(securityToken)) {
|
||||||
if (StringUtils.isNotBlank(securityToken)) {
|
account = new StsAccount(accessId, accessKey, securityToken);
|
||||||
account = new StsAccount(accessId, accessKey, securityToken);
|
|
||||||
} else {
|
|
||||||
account = new AliyunAccount(accessId, accessKey);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
throw DataXException.asDataXException(OdpsReaderErrorCode.ACCOUNT_TYPE_ERROR,
|
account = new AliyunAccount(accessId, accessKey);
|
||||||
MESSAGE_SOURCE.message("odpsutil.3", accountType));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Odps odps = new Odps(account);
|
Odps odps = new Odps(account);
|
||||||
|
@ -71,8 +71,7 @@ ODPSWriter插件用于实现往ODPS插入或者更新数据,主要提供给etl
|
|||||||
"accessKey": "xxxx",
|
"accessKey": "xxxx",
|
||||||
"truncate": true,
|
"truncate": true,
|
||||||
"odpsServer": "http://sxxx/api",
|
"odpsServer": "http://sxxx/api",
|
||||||
"tunnelServer": "http://xxx",
|
"tunnelServer": "http://xxx"
|
||||||
"accountType": "aliyun"
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,13 +2,6 @@ package com.alibaba.datax.plugin.writer.odpswriter;
|
|||||||
|
|
||||||
|
|
||||||
public class Constant {
|
public class Constant {
|
||||||
public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID";
|
|
||||||
|
|
||||||
public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY";
|
|
||||||
|
|
||||||
public static final String DEFAULT_ACCOUNT_TYPE = "aliyun";
|
|
||||||
|
|
||||||
public static final String TAOBAO_ACCOUNT_TYPE = "taobao";
|
|
||||||
|
|
||||||
public static final String COLUMN_POSITION = "columnPosition";
|
public static final String COLUMN_POSITION = "columnPosition";
|
||||||
|
|
||||||
|
@ -30,8 +30,6 @@ public final class Key {
|
|||||||
//boolean 类型,default:false
|
//boolean 类型,default:false
|
||||||
public final static String EMPTY_AS_NULL = "emptyAsNull";
|
public final static String EMPTY_AS_NULL = "emptyAsNull";
|
||||||
|
|
||||||
public final static String ACCOUNT_TYPE = "accountType";
|
|
||||||
|
|
||||||
public final static String IS_COMPRESS = "isCompress";
|
public final static String IS_COMPRESS = "isCompress";
|
||||||
|
|
||||||
// preSql
|
// preSql
|
||||||
|
@ -62,7 +62,6 @@ public class OdpsWriter extends Writer {
|
|||||||
private String tableName;
|
private String tableName;
|
||||||
private String tunnelServer;
|
private String tunnelServer;
|
||||||
private String partition;
|
private String partition;
|
||||||
private String accountType;
|
|
||||||
private boolean truncate;
|
private boolean truncate;
|
||||||
private String uploadId;
|
private String uploadId;
|
||||||
private TableTunnel.UploadSession masterUpload;
|
private TableTunnel.UploadSession masterUpload;
|
||||||
@ -104,8 +103,6 @@ public class OdpsWriter extends Writer {
|
|||||||
this.tableName = this.originalConfig.getString(Key.TABLE);
|
this.tableName = this.originalConfig.getString(Key.TABLE);
|
||||||
this.tunnelServer = this.originalConfig.getString(Key.TUNNEL_SERVER, null);
|
this.tunnelServer = this.originalConfig.getString(Key.TUNNEL_SERVER, null);
|
||||||
|
|
||||||
this.dealAK();
|
|
||||||
|
|
||||||
// init odps config
|
// init odps config
|
||||||
this.odps = OdpsUtil.initOdpsProject(this.originalConfig);
|
this.odps = OdpsUtil.initOdpsProject(this.originalConfig);
|
||||||
|
|
||||||
@ -153,31 +150,6 @@ public class OdpsWriter extends Writer {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void dealAK() {
|
|
||||||
this.accountType = this.originalConfig.getString(Key.ACCOUNT_TYPE,
|
|
||||||
Constant.DEFAULT_ACCOUNT_TYPE);
|
|
||||||
|
|
||||||
if (!Constant.DEFAULT_ACCOUNT_TYPE.equalsIgnoreCase(this.accountType) &&
|
|
||||||
!Constant.TAOBAO_ACCOUNT_TYPE.equalsIgnoreCase(this.accountType)) {
|
|
||||||
throw DataXException.asDataXException(OdpsWriterErrorCode.ACCOUNT_TYPE_ERROR,
|
|
||||||
MESSAGE_SOURCE.message("odpswriter.1", accountType));
|
|
||||||
}
|
|
||||||
this.originalConfig.set(Key.ACCOUNT_TYPE, this.accountType);
|
|
||||||
|
|
||||||
//检查accessId,accessKey配置
|
|
||||||
if (Constant.DEFAULT_ACCOUNT_TYPE
|
|
||||||
.equalsIgnoreCase(this.accountType)) {
|
|
||||||
this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig);
|
|
||||||
String accessId = this.originalConfig.getString(Key.ACCESS_ID);
|
|
||||||
String accessKey = this.originalConfig.getString(Key.ACCESS_KEY);
|
|
||||||
if (IS_DEBUG) {
|
|
||||||
LOG.debug("accessId:[{}], accessKey:[{}] .", accessId,
|
|
||||||
accessKey);
|
|
||||||
}
|
|
||||||
LOG.info("accessId:[{}] .", accessId);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void dealDynamicPartition() {
|
private void dealDynamicPartition() {
|
||||||
/*
|
/*
|
||||||
* 如果显示配置了 supportDynamicPartition,则以配置为准
|
* 如果显示配置了 supportDynamicPartition,则以配置为准
|
||||||
@ -241,20 +213,6 @@ public class OdpsWriter extends Writer {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void prepare() {
|
public void prepare() {
|
||||||
String accessId = null;
|
|
||||||
String accessKey = null;
|
|
||||||
if (Constant.DEFAULT_ACCOUNT_TYPE
|
|
||||||
.equalsIgnoreCase(this.accountType)) {
|
|
||||||
this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig);
|
|
||||||
accessId = this.originalConfig.getString(Key.ACCESS_ID);
|
|
||||||
accessKey = this.originalConfig.getString(Key.ACCESS_KEY);
|
|
||||||
if (IS_DEBUG) {
|
|
||||||
LOG.debug("accessId:[{}], accessKey:[{}] .", accessId,
|
|
||||||
accessKey);
|
|
||||||
}
|
|
||||||
LOG.info("accessId:[{}] .", accessId);
|
|
||||||
}
|
|
||||||
|
|
||||||
// init odps config
|
// init odps config
|
||||||
this.odps = OdpsUtil.initOdpsProject(this.originalConfig);
|
this.odps = OdpsUtil.initOdpsProject(this.originalConfig);
|
||||||
|
|
||||||
|
@ -1,65 +0,0 @@
|
|||||||
/**
|
|
||||||
* (C) 2010-2022 Alibaba Group Holding Limited.
|
|
||||||
*
|
|
||||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
||||||
* you may not use this file except in compliance with the License.
|
|
||||||
* You may obtain a copy of the License at
|
|
||||||
*
|
|
||||||
* http://www.apache.org/licenses/LICENSE-2.0
|
|
||||||
*
|
|
||||||
* Unless required by applicable law or agreed to in writing, software
|
|
||||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
||||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
||||||
* See the License for the specific language governing permissions and
|
|
||||||
* limitations under the License.
|
|
||||||
*/
|
|
||||||
|
|
||||||
package com.alibaba.datax.plugin.writer.odpswriter.util;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.exception.DataXException;
|
|
||||||
import com.alibaba.datax.common.util.Configuration;
|
|
||||||
import com.alibaba.datax.common.util.IdAndKeyRollingUtil;
|
|
||||||
import com.alibaba.datax.common.util.MessageSource;
|
|
||||||
import com.alibaba.datax.plugin.writer.odpswriter.Key;
|
|
||||||
import com.alibaba.datax.plugin.writer.odpswriter.OdpsWriterErrorCode;
|
|
||||||
|
|
||||||
import org.apache.commons.lang3.StringUtils;
|
|
||||||
import org.slf4j.Logger;
|
|
||||||
import org.slf4j.LoggerFactory;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public class IdAndKeyUtil {
|
|
||||||
private static Logger LOG = LoggerFactory.getLogger(IdAndKeyUtil.class);
|
|
||||||
private static final MessageSource MESSAGE_SOURCE = MessageSource.loadResourceBundle(IdAndKeyUtil.class);
|
|
||||||
|
|
||||||
public static Configuration parseAccessIdAndKey(Configuration originalConfig) {
|
|
||||||
String accessId = originalConfig.getString(Key.ACCESS_ID);
|
|
||||||
String accessKey = originalConfig.getString(Key.ACCESS_KEY);
|
|
||||||
|
|
||||||
// 只要 accessId,accessKey 二者配置了一个,就理解为是用户本意是要直接手动配置其 accessid/accessKey
|
|
||||||
if (StringUtils.isNotBlank(accessId) || StringUtils.isNotBlank(accessKey)) {
|
|
||||||
LOG.info("Try to get accessId/accessKey from your config.");
|
|
||||||
//通过如下语句,进行检查是否确实配置了
|
|
||||||
accessId = originalConfig.getNecessaryValue(Key.ACCESS_ID, OdpsWriterErrorCode.REQUIRED_VALUE);
|
|
||||||
accessKey = originalConfig.getNecessaryValue(Key.ACCESS_KEY, OdpsWriterErrorCode.REQUIRED_VALUE);
|
|
||||||
//检查完毕,返回即可
|
|
||||||
return originalConfig;
|
|
||||||
} else {
|
|
||||||
Map<String, String> envProp = System.getenv();
|
|
||||||
return getAccessIdAndKeyFromEnv(originalConfig, envProp);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private static Configuration getAccessIdAndKeyFromEnv(Configuration originalConfig,
|
|
||||||
Map<String, String> envProp) {
|
|
||||||
// 如果获取到ak,在getAccessIdAndKeyFromEnv中已经设置到originalConfig了
|
|
||||||
String accessKey = IdAndKeyRollingUtil.getAccessIdAndKeyFromEnv(originalConfig);
|
|
||||||
if (StringUtils.isBlank(accessKey)) {
|
|
||||||
// 无处获取(既没有配置在作业中,也没用在环境变量中)
|
|
||||||
throw DataXException.asDataXException(OdpsWriterErrorCode.GET_ID_KEY_FAIL,
|
|
||||||
MESSAGE_SOURCE.message("idandkeyutil.2"));
|
|
||||||
}
|
|
||||||
return originalConfig;
|
|
||||||
}
|
|
||||||
}
|
|
@ -79,7 +79,6 @@ public class OdpsUtil {
|
|||||||
|
|
||||||
|
|
||||||
public static Odps initOdpsProject(Configuration originalConfig) {
|
public static Odps initOdpsProject(Configuration originalConfig) {
|
||||||
String accountType = originalConfig.getString(Key.ACCOUNT_TYPE);
|
|
||||||
String accessId = originalConfig.getString(Key.ACCESS_ID);
|
String accessId = originalConfig.getString(Key.ACCESS_ID);
|
||||||
String accessKey = originalConfig.getString(Key.ACCESS_KEY);
|
String accessKey = originalConfig.getString(Key.ACCESS_KEY);
|
||||||
|
|
||||||
@ -88,15 +87,10 @@ public class OdpsUtil {
|
|||||||
String securityToken = originalConfig.getString(Key.SECURITY_TOKEN);
|
String securityToken = originalConfig.getString(Key.SECURITY_TOKEN);
|
||||||
|
|
||||||
Account account;
|
Account account;
|
||||||
if (accountType.equalsIgnoreCase(Constant.DEFAULT_ACCOUNT_TYPE)) {
|
if (StringUtils.isNotBlank(securityToken)) {
|
||||||
if (StringUtils.isNotBlank(securityToken)) {
|
account = new com.aliyun.odps.account.StsAccount(accessId, accessKey, securityToken);
|
||||||
account = new com.aliyun.odps.account.StsAccount(accessId, accessKey, securityToken);
|
|
||||||
} else {
|
|
||||||
account = new AliyunAccount(accessId, accessKey);
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
throw DataXException.asDataXException(OdpsWriterErrorCode.ACCOUNT_TYPE_ERROR,
|
account = new AliyunAccount(accessId, accessKey);
|
||||||
MESSAGE_SOURCE.message("odpsutil.4", accountType));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
Odps odps = new Odps(account);
|
Odps odps = new Odps(account);
|
||||||
|
2
pom.xml
2
pom.xml
@ -22,7 +22,7 @@
|
|||||||
<commons-lang3-version>3.3.2</commons-lang3-version>
|
<commons-lang3-version>3.3.2</commons-lang3-version>
|
||||||
<commons-configuration-version>1.10</commons-configuration-version>
|
<commons-configuration-version>1.10</commons-configuration-version>
|
||||||
<commons-cli-version>1.2</commons-cli-version>
|
<commons-cli-version>1.2</commons-cli-version>
|
||||||
<fastjson-version>2.0.19</fastjson-version>
|
<fastjson-version>2.0.23</fastjson-version>
|
||||||
<guava-version>16.0.1</guava-version>
|
<guava-version>16.0.1</guava-version>
|
||||||
<diamond.version>3.7.2.1-SNAPSHOT</diamond.version>
|
<diamond.version>3.7.2.1-SNAPSHOT</diamond.version>
|
||||||
|
|
||||||
|
@ -64,7 +64,6 @@
|
|||||||
<dependency>
|
<dependency>
|
||||||
<groupId>com.alibaba.fastjson2</groupId>
|
<groupId>com.alibaba.fastjson2</groupId>
|
||||||
<artifactId>fastjson2</artifactId>
|
<artifactId>fastjson2</artifactId>
|
||||||
<version>2.0.23</version>
|
|
||||||
</dependency>
|
</dependency>
|
||||||
<dependency>
|
<dependency>
|
||||||
<groupId>mysql</groupId>
|
<groupId>mysql</groupId>
|
||||||
|
Loading…
Reference in New Issue
Block a user