diff --git a/README.md b/README.md index 2c5e95f4..01bbc3ea 100644 --- a/README.md +++ b/README.md @@ -26,7 +26,7 @@ DataX本身作为数据同步框架,将不同数据源的同步抽象为从源 # Quick Start -##### Download [DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202210/datax.tar.gz) +##### Download [DataX下载地址](https://datax-opensource.oss-cn-hangzhou.aliyuncs.com/202303/datax.tar.gz) ##### 请点击:[Quick Start](https://github.com/alibaba/DataX/blob/master/userGuid.md) @@ -70,6 +70,7 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N | | Databend | | √ | [写](https://github.com/alibaba/DataX/blob/master/databendwriter/doc/databendwriter.md) | | | Hive | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | | | kudu | | √ | [写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | +| | selectdb | | √ | [写](https://github.com/alibaba/DataX/blob/master/selectdbwriter/doc/selectdbwriter.md) | | 无结构化数据存储 | TxtFile | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md) | | | FTP | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/ftpreader/doc/ftpreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/ftpwriter/doc/ftpwriter.md) | | | HDFS | √ | √ | [读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md) | @@ -108,6 +109,12 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N DataX 后续计划月度迭代更新,也欢迎感兴趣的同学提交 Pull requests,月度更新内容会介绍介绍如下。 +- [datax_v202303](https://github.com/alibaba/DataX/releases/tag/datax_v202303) + - 精简代码 + - 新增插件(adbmysqlwriter、databendwriter、selectdbwriter) + - 优化插件、修复问题(sqlserver、hdfs、cassandra、kudu、oss) + - fastjson 升级到 fastjson2 + - [datax_v202210](https://github.com/alibaba/DataX/releases/tag/datax_v202210) - 涉及通道能力更新(OceanBase、Tdengine、Doris等) diff --git a/adswriter/doc/adswriter.md b/adswriter/doc/adswriter.md index 4a0fd961..c02f8018 100644 --- a/adswriter/doc/adswriter.md +++ b/adswriter/doc/adswriter.md @@ -110,7 +110,6 @@ DataX 将数据直连ADS接口,利用ADS暴露的INSERT接口直写到ADS。 "account": "xxx@aliyun.com", "odpsServer": "xxx", "tunnelServer": "xxx", - "accountType": "aliyun", "project": "transfer_project" }, "writeMode": "load", diff --git a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/TransferProjectConf.java b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/TransferProjectConf.java index bff4b7b9..3d28a833 100644 --- a/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/TransferProjectConf.java +++ b/adswriter/src/main/java/com/alibaba/datax/plugin/writer/adswriter/load/TransferProjectConf.java @@ -12,7 +12,6 @@ public class TransferProjectConf { public final static String KEY_ACCOUNT = "odps.account"; public final static String KEY_ODPS_SERVER = "odps.odpsServer"; public final static String KEY_ODPS_TUNNEL = "odps.tunnelServer"; - public final static String KEY_ACCOUNT_TYPE = "odps.accountType"; public final static String KEY_PROJECT = "odps.project"; private String accessId; @@ -20,7 +19,6 @@ public class TransferProjectConf { private String account; private String odpsServer; private String odpsTunnel; - private String accountType; private String project; public static TransferProjectConf create(Configuration adsWriterConf) { @@ -30,7 +28,6 @@ public class TransferProjectConf { res.account = adsWriterConf.getString(KEY_ACCOUNT); res.odpsServer = adsWriterConf.getString(KEY_ODPS_SERVER); res.odpsTunnel = adsWriterConf.getString(KEY_ODPS_TUNNEL); - res.accountType = adsWriterConf.getString(KEY_ACCOUNT_TYPE, "aliyun"); res.project = adsWriterConf.getString(KEY_PROJECT); return res; } @@ -55,9 +52,6 @@ public class TransferProjectConf { return odpsTunnel; } - public String getAccountType() { - return accountType; - } public String getProject() { return project; diff --git a/common/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java b/common/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java index ea9aa421..cf0457bc 100644 --- a/common/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java +++ b/common/src/main/java/com/alibaba/datax/common/statistics/PerfTrace.java @@ -31,7 +31,6 @@ public class PerfTrace { private int taskGroupId; private int channelNumber; - private int priority; private int batchSize = 500; private volatile boolean perfReportEnable = true; @@ -54,12 +53,12 @@ public class PerfTrace { * @param taskGroupId * @return */ - public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) { + public static PerfTrace getInstance(boolean isJob, long jobId, int taskGroupId, boolean enable) { if (instance == null) { synchronized (lock) { if (instance == null) { - instance = new PerfTrace(isJob, jobId, taskGroupId, priority, enable); + instance = new PerfTrace(isJob, jobId, taskGroupId, enable); } } } @@ -76,22 +75,21 @@ public class PerfTrace { LOG.error("PerfTrace instance not be init! must have some error! "); synchronized (lock) { if (instance == null) { - instance = new PerfTrace(false, -1111, -1111, 0, false); + instance = new PerfTrace(false, -1111, -1111, false); } } } return instance; } - private PerfTrace(boolean isJob, long jobId, int taskGroupId, int priority, boolean enable) { + private PerfTrace(boolean isJob, long jobId, int taskGroupId, boolean enable) { try { this.perfTraceId = isJob ? "job_" + jobId : String.format("taskGroup_%s_%s", jobId, taskGroupId); this.enable = enable; this.isJob = isJob; this.taskGroupId = taskGroupId; this.instId = jobId; - this.priority = priority; - LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s, priority=%s", this.perfTraceId, this.enable, this.priority)); + LOG.info(String.format("PerfTrace traceId=%s, isEnable=%s", this.perfTraceId, this.enable)); } catch (Exception e) { // do nothing @@ -398,7 +396,6 @@ public class PerfTrace { jdo.setWindowEnd(this.windowEnd); jdo.setJobStartTime(jobStartTime); jdo.setJobRunTimeMs(System.currentTimeMillis() - jobStartTime.getTime()); - jdo.setJobPriority(this.priority); jdo.setChannelNum(this.channelNumber); jdo.setCluster(this.cluster); jdo.setJobDomain(this.jobDomain); @@ -609,7 +606,6 @@ public class PerfTrace { private Date jobStartTime; private Date jobEndTime; private Long jobRunTimeMs; - private Integer jobPriority; private Integer channelNum; private String cluster; private String jobDomain; @@ -680,10 +676,6 @@ public class PerfTrace { return jobRunTimeMs; } - public Integer getJobPriority() { - return jobPriority; - } - public Integer getChannelNum() { return channelNum; } @@ -816,10 +808,6 @@ public class PerfTrace { this.jobRunTimeMs = jobRunTimeMs; } - public void setJobPriority(Integer jobPriority) { - this.jobPriority = jobPriority; - } - public void setChannelNum(Integer channelNum) { this.channelNum = channelNum; } diff --git a/common/src/main/java/com/alibaba/datax/common/util/IdAndKeyRollingUtil.java b/common/src/main/java/com/alibaba/datax/common/util/IdAndKeyRollingUtil.java deleted file mode 100644 index 8bab301e..00000000 --- a/common/src/main/java/com/alibaba/datax/common/util/IdAndKeyRollingUtil.java +++ /dev/null @@ -1,62 +0,0 @@ -package com.alibaba.datax.common.util; - -import java.util.Map; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.alibaba.datax.common.exception.DataXException; - -public class IdAndKeyRollingUtil { - private static Logger LOGGER = LoggerFactory.getLogger(IdAndKeyRollingUtil.class); - public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID"; - public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY"; - - public final static String ACCESS_ID = "accessId"; - public final static String ACCESS_KEY = "accessKey"; - - public static String parseAkFromSkynetAccessKey() { - Map envProp = System.getenv(); - String skynetAccessID = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSID); - String skynetAccessKey = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSKEY); - String accessKey = null; - // follow 原有的判断条件 - // 环境变量中,如果存在SKYNET_ACCESSID/SKYNET_ACCESSKEy(只要有其中一个变量,则认为一定是两个都存在的! - // if (StringUtils.isNotBlank(skynetAccessID) || - // StringUtils.isNotBlank(skynetAccessKey)) { - // 检查严格,只有加密串不为空的时候才进去,不过 之前能跑的加密串都不应该为空 - if (StringUtils.isNotBlank(skynetAccessKey)) { - LOGGER.info("Try to get accessId/accessKey from environment SKYNET_ACCESSKEY."); - accessKey = DESCipher.decrypt(skynetAccessKey); - if (StringUtils.isBlank(accessKey)) { - // 环境变量里面有,但是解析不到 - throw DataXException.asDataXException(String.format( - "Failed to get the [accessId]/[accessKey] from the environment variable. The [accessId]=[%s]", - skynetAccessID)); - } - } - if (StringUtils.isNotBlank(accessKey)) { - LOGGER.info("Get accessId/accessKey from environment variables SKYNET_ACCESSKEY successfully."); - } - return accessKey; - } - - public static String getAccessIdAndKeyFromEnv(Configuration originalConfig) { - String accessId = null; - Map envProp = System.getenv(); - accessId = envProp.get(IdAndKeyRollingUtil.SKYNET_ACCESSID); - String accessKey = null; - if (StringUtils.isBlank(accessKey)) { - // 老的没有出异常,只是获取不到ak - accessKey = IdAndKeyRollingUtil.parseAkFromSkynetAccessKey(); - } - - if (StringUtils.isNotBlank(accessKey)) { - // 确认使用这个的都是 accessId、accessKey的命名习惯 - originalConfig.set(IdAndKeyRollingUtil.ACCESS_ID, accessId); - originalConfig.set(IdAndKeyRollingUtil.ACCESS_KEY, accessKey); - } - return accessKey; - } -} diff --git a/core/src/main/java/com/alibaba/datax/core/Engine.java b/core/src/main/java/com/alibaba/datax/core/Engine.java index 38342532..4ba9fc18 100755 --- a/core/src/main/java/com/alibaba/datax/core/Engine.java +++ b/core/src/main/java/com/alibaba/datax/core/Engine.java @@ -79,16 +79,9 @@ public class Engine { perfReportEnable = false; } - int priority = 0; - try { - priority = Integer.parseInt(System.getenv("SKYNET_PRIORITY")); - }catch (NumberFormatException e){ - LOG.warn("prioriy set to 0, because NumberFormatException, the value is: "+System.getProperty("PROIORY")); - } - Configuration jobInfoConfig = allConf.getConfiguration(CoreConstant.DATAX_JOB_JOBINFO); //初始化PerfTrace - PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, priority, traceEnable); + PerfTrace perfTrace = PerfTrace.getInstance(isJob, instanceId, taskGroupId, traceEnable); perfTrace.setJobInfo(jobInfoConfig,perfReportEnable,channelNumber); container.start(); diff --git a/databendwriter/pom.xml b/databendwriter/pom.xml index aefc26fa..9ddc735c 100644 --- a/databendwriter/pom.xml +++ b/databendwriter/pom.xml @@ -98,4 +98,4 @@ - \ No newline at end of file + diff --git a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchClient.java b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchClient.java index 869c33b9..08486e1f 100644 --- a/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchClient.java +++ b/elasticsearchwriter/src/main/java/com/alibaba/datax/plugin/writer/elasticsearchwriter/ElasticSearchClient.java @@ -53,6 +53,8 @@ public class ElasticSearchClient { public ElasticSearchClient(Configuration conf) { this.conf = conf; String endpoint = Key.getEndpoint(conf); + //es是支持集群写入的 + String[] endpoints = endpoint.split(","); String user = Key.getUsername(conf); String passwd = Key.getPassword(conf); boolean multiThread = Key.isMultiThread(conf); @@ -63,7 +65,7 @@ public class ElasticSearchClient { int totalConnection = this.conf.getInt("maxTotalConnection", 200); JestClientFactory factory = new JestClientFactory(); Builder httpClientConfig = new HttpClientConfig - .Builder(endpoint) + .Builder(Arrays.asList(endpoints)) // .setPreemptiveAuth(new HttpHost(endpoint)) .multiThreaded(multiThread) .connTimeout(readTimeout) diff --git a/ftpwriter/doc/ftpwriter.md b/ftpwriter/doc/ftpwriter.md index 6b1b2687..a38a1052 100644 --- a/ftpwriter/doc/ftpwriter.md +++ b/ftpwriter/doc/ftpwriter.md @@ -24,7 +24,7 @@ FtpWriter实现了从DataX协议转为FTP文件功能,FTP文件本身是无结 我们不能做到: -1. 单个文件不能支持并发写入。 +1. 单个文件并发写入。 ## 3 功能说明 diff --git a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java index 4bfe5d1b..5ba572e1 100644 --- a/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java +++ b/hdfsreader/src/main/java/com/alibaba/datax/plugin/reader/hdfsreader/DFSUtil.java @@ -331,26 +331,30 @@ public class DFSUtil { //If the network disconnected, will retry 45 times, each time the retry interval for 20 seconds //Each file as a split //TODO multy threads - InputSplit[] splits = in.getSplits(conf, 1); + // OrcInputFormat getSplits params numSplits not used, splits size = block numbers + InputSplit[] splits = in.getSplits(conf, -1); + for (InputSplit split : splits) { + { + RecordReader reader = in.getRecordReader(split, conf, Reporter.NULL); + Object key = reader.createKey(); + Object value = reader.createValue(); + // 获取列信息 + List fields = inspector.getAllStructFieldRefs(); - RecordReader reader = in.getRecordReader(splits[0], conf, Reporter.NULL); - Object key = reader.createKey(); - Object value = reader.createValue(); - // 获取列信息 - List fields = inspector.getAllStructFieldRefs(); + List recordFields; + while (reader.next(key, value)) { + recordFields = new ArrayList(); - List recordFields; - while (reader.next(key, value)) { - recordFields = new ArrayList(); - - for (int i = 0; i <= columnIndexMax; i++) { - Object field = inspector.getStructFieldData(value, fields.get(i)); - recordFields.add(field); + for (int i = 0; i <= columnIndexMax; i++) { + Object field = inspector.getStructFieldData(value, fields.get(i)); + recordFields.add(field); + } + transportOneRecord(column, recordFields, recordSender, + taskPluginCollector, isReadAllColumns, nullFormat); + } + reader.close(); } - transportOneRecord(column, recordFields, recordSender, - taskPluginCollector, isReadAllColumns, nullFormat); } - reader.close(); } catch (Exception e) { String message = String.format("从orcfile文件路径[%s]中读取数据发生异常,请联系系统管理员。" , sourceOrcFilePath); diff --git a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java index bff3509f..df872842 100644 --- a/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java +++ b/kuduwriter/src/main/java/com/q1/datax/plugin/writer/kudu11xwriter/KuduWriterTask.java @@ -134,7 +134,7 @@ public class KuduWriterTask { break; case BOOLEAN: synchronized (lock) { - row.addBoolean(name, Boolean.getBoolean(rawData)); + row.addBoolean(name, Boolean.parseBoolean(rawData)); } break; case STRING: diff --git a/mongodbreader/doc/mongodbreader.md b/mongodbreader/doc/mongodbreader.md index 99d25731..297e598c 100644 --- a/mongodbreader/doc/mongodbreader.md +++ b/mongodbreader/doc/mongodbreader.md @@ -114,8 +114,7 @@ MongoDBReader通过Datax框架从MongoDB并行的读取数据,通过主控的J "accessKey": "********************", "truncate": true, "odpsServer": "xxx/api", - "tunnelServer": "xxx", - "accountType": "aliyun" + "tunnelServer": "xxx" } } } diff --git a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Constant.java b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Constant.java index dee2ef5c..cf34762d 100755 --- a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Constant.java +++ b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Constant.java @@ -14,20 +14,9 @@ public class Constant { public static final String PARTITION_SPLIT_MODE = "partition"; - public static final String DEFAULT_ACCOUNT_TYPE = "aliyun"; - - public static final String TAOBAO_ACCOUNT_TYPE = "taobao"; - // 常量字段用COLUMN_CONSTANT_FLAG 首尾包住即可 public final static String COLUMN_CONSTANT_FLAG = "'"; - /** - * 以下是获取accesskey id 需要用到的常量值 - */ - public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID"; - - public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY"; - public static final String PARTITION_COLUMNS = "partitionColumns"; public static final String PARSED_COLUMNS = "parsedColumns"; diff --git a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Key.java b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Key.java index 2cee65d1..6f8c7d92 100755 --- a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Key.java +++ b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/Key.java @@ -24,9 +24,6 @@ public class Key { // 当值为:partition 则只切分到分区;当值为:record,则当按照分区切分后达不到adviceNum时,继续按照record切分 public final static String SPLIT_MODE = "splitMode"; - // 账号类型,默认为aliyun,也可能为taobao等其他类型 - public final static String ACCOUNT_TYPE = "accountType"; - public final static String PACKAGE_AUTHORIZED_PROJECT = "packageAuthorizedProject"; public final static String IS_COMPRESS = "isCompress"; diff --git a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/OdpsReader.java b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/OdpsReader.java index 206b0135..615cee50 100755 --- a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/OdpsReader.java +++ b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/OdpsReader.java @@ -42,12 +42,6 @@ public class OdpsReader extends Reader { this.originalConfig = super.getPluginJobConf(); this.successOnNoPartition = this.originalConfig.getBool(Key.SUCCESS_ON_NO_PATITION, false); - //如果用户没有配置accessId/accessKey,尝试从环境变量获取 - String accountType = originalConfig.getString(Key.ACCOUNT_TYPE, Constant.DEFAULT_ACCOUNT_TYPE); - if (Constant.DEFAULT_ACCOUNT_TYPE.equalsIgnoreCase(accountType)) { - this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig); - } - //检查必要的参数配置 OdpsUtil.checkNecessaryConfig(this.originalConfig); //重试次数的配置检查 diff --git a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/util/IdAndKeyUtil.java b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/util/IdAndKeyUtil.java deleted file mode 100644 index 05722b59..00000000 --- a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/util/IdAndKeyUtil.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * (C) 2010-2022 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.datax.plugin.reader.odpsreader.util; - -import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.common.util.IdAndKeyRollingUtil; -import com.alibaba.datax.common.util.MessageSource; -import com.alibaba.datax.plugin.reader.odpsreader.Key; -import com.alibaba.datax.plugin.reader.odpsreader.OdpsReaderErrorCode; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public class IdAndKeyUtil { - private static Logger LOG = LoggerFactory.getLogger(IdAndKeyUtil.class); - private static MessageSource MESSAGE_SOURCE = MessageSource.loadResourceBundle(IdAndKeyUtil.class); - - public static Configuration parseAccessIdAndKey(Configuration originalConfig) { - String accessId = originalConfig.getString(Key.ACCESS_ID); - String accessKey = originalConfig.getString(Key.ACCESS_KEY); - - // 只要 accessId,accessKey 二者配置了一个,就理解为是用户本意是要直接手动配置其 accessid/accessKey - if (StringUtils.isNotBlank(accessId) || StringUtils.isNotBlank(accessKey)) { - LOG.info("Try to get accessId/accessKey from your config."); - //通过如下语句,进行检查是否确实配置了 - accessId = originalConfig.getNecessaryValue(Key.ACCESS_ID, OdpsReaderErrorCode.REQUIRED_VALUE); - accessKey = originalConfig.getNecessaryValue(Key.ACCESS_KEY, OdpsReaderErrorCode.REQUIRED_VALUE); - //检查完毕,返回即可 - return originalConfig; - } else { - Map envProp = System.getenv(); - return getAccessIdAndKeyFromEnv(originalConfig, envProp); - } - } - - private static Configuration getAccessIdAndKeyFromEnv(Configuration originalConfig, - Map envProp) { - // 如果获取到ak,在getAccessIdAndKeyFromEnv中已经设置到originalConfig了 - String accessKey = IdAndKeyRollingUtil.getAccessIdAndKeyFromEnv(originalConfig); - if (StringUtils.isBlank(accessKey)) { - // 无处获取(既没有配置在作业中,也没用在环境变量中) - throw DataXException.asDataXException(OdpsReaderErrorCode.GET_ID_KEY_FAIL, - MESSAGE_SOURCE.message("idandkeyutil.2")); - } - return originalConfig; - } -} diff --git a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/util/OdpsUtil.java b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/util/OdpsUtil.java index f2ad8e0f..0ff34a81 100755 --- a/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/util/OdpsUtil.java +++ b/odpsreader/src/main/java/com/alibaba/datax/plugin/reader/odpsreader/util/OdpsUtil.java @@ -76,19 +76,12 @@ public final class OdpsUtil { defaultProject = packageAuthorizedProject; } - String accountType = originalConfig.getString(Key.ACCOUNT_TYPE, - Constant.DEFAULT_ACCOUNT_TYPE); Account account = null; - if (accountType.equalsIgnoreCase(Constant.DEFAULT_ACCOUNT_TYPE)) { - if (StringUtils.isNotBlank(securityToken)) { - account = new StsAccount(accessId, accessKey, securityToken); - } else { - account = new AliyunAccount(accessId, accessKey); - } + if (StringUtils.isNotBlank(securityToken)) { + account = new StsAccount(accessId, accessKey, securityToken); } else { - throw DataXException.asDataXException(OdpsReaderErrorCode.ACCOUNT_TYPE_ERROR, - MESSAGE_SOURCE.message("odpsutil.3", accountType)); + account = new AliyunAccount(accessId, accessKey); } Odps odps = new Odps(account); diff --git a/odpswriter/doc/odpswriter.md b/odpswriter/doc/odpswriter.md index d81672b0..845dd1d3 100644 --- a/odpswriter/doc/odpswriter.md +++ b/odpswriter/doc/odpswriter.md @@ -71,8 +71,7 @@ ODPSWriter插件用于实现往ODPS插入或者更新数据,主要提供给etl "accessKey": "xxxx", "truncate": true, "odpsServer": "http://sxxx/api", - "tunnelServer": "http://xxx", - "accountType": "aliyun" + "tunnelServer": "http://xxx" } } } diff --git a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Constant.java b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Constant.java index f4d9734b..efedfea9 100755 --- a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Constant.java +++ b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Constant.java @@ -2,13 +2,6 @@ package com.alibaba.datax.plugin.writer.odpswriter; public class Constant { - public static final String SKYNET_ACCESSID = "SKYNET_ACCESSID"; - - public static final String SKYNET_ACCESSKEY = "SKYNET_ACCESSKEY"; - - public static final String DEFAULT_ACCOUNT_TYPE = "aliyun"; - - public static final String TAOBAO_ACCOUNT_TYPE = "taobao"; public static final String COLUMN_POSITION = "columnPosition"; diff --git a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Key.java b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Key.java index 7ee11128..8dff8a4c 100755 --- a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Key.java +++ b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/Key.java @@ -30,8 +30,6 @@ public final class Key { //boolean 类型,default:false public final static String EMPTY_AS_NULL = "emptyAsNull"; - public final static String ACCOUNT_TYPE = "accountType"; - public final static String IS_COMPRESS = "isCompress"; // preSql diff --git a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/OdpsWriter.java b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/OdpsWriter.java index 06476463..9b7276fa 100755 --- a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/OdpsWriter.java +++ b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/OdpsWriter.java @@ -62,7 +62,6 @@ public class OdpsWriter extends Writer { private String tableName; private String tunnelServer; private String partition; - private String accountType; private boolean truncate; private String uploadId; private TableTunnel.UploadSession masterUpload; @@ -104,8 +103,6 @@ public class OdpsWriter extends Writer { this.tableName = this.originalConfig.getString(Key.TABLE); this.tunnelServer = this.originalConfig.getString(Key.TUNNEL_SERVER, null); - this.dealAK(); - // init odps config this.odps = OdpsUtil.initOdpsProject(this.originalConfig); @@ -153,31 +150,6 @@ public class OdpsWriter extends Writer { } } - private void dealAK() { - this.accountType = this.originalConfig.getString(Key.ACCOUNT_TYPE, - Constant.DEFAULT_ACCOUNT_TYPE); - - if (!Constant.DEFAULT_ACCOUNT_TYPE.equalsIgnoreCase(this.accountType) && - !Constant.TAOBAO_ACCOUNT_TYPE.equalsIgnoreCase(this.accountType)) { - throw DataXException.asDataXException(OdpsWriterErrorCode.ACCOUNT_TYPE_ERROR, - MESSAGE_SOURCE.message("odpswriter.1", accountType)); - } - this.originalConfig.set(Key.ACCOUNT_TYPE, this.accountType); - - //检查accessId,accessKey配置 - if (Constant.DEFAULT_ACCOUNT_TYPE - .equalsIgnoreCase(this.accountType)) { - this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig); - String accessId = this.originalConfig.getString(Key.ACCESS_ID); - String accessKey = this.originalConfig.getString(Key.ACCESS_KEY); - if (IS_DEBUG) { - LOG.debug("accessId:[{}], accessKey:[{}] .", accessId, - accessKey); - } - LOG.info("accessId:[{}] .", accessId); - } - } - private void dealDynamicPartition() { /* * 如果显示配置了 supportDynamicPartition,则以配置为准 @@ -241,20 +213,6 @@ public class OdpsWriter extends Writer { @Override public void prepare() { - String accessId = null; - String accessKey = null; - if (Constant.DEFAULT_ACCOUNT_TYPE - .equalsIgnoreCase(this.accountType)) { - this.originalConfig = IdAndKeyUtil.parseAccessIdAndKey(this.originalConfig); - accessId = this.originalConfig.getString(Key.ACCESS_ID); - accessKey = this.originalConfig.getString(Key.ACCESS_KEY); - if (IS_DEBUG) { - LOG.debug("accessId:[{}], accessKey:[{}] .", accessId, - accessKey); - } - LOG.info("accessId:[{}] .", accessId); - } - // init odps config this.odps = OdpsUtil.initOdpsProject(this.originalConfig); diff --git a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/util/IdAndKeyUtil.java b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/util/IdAndKeyUtil.java deleted file mode 100755 index 98c9afdd..00000000 --- a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/util/IdAndKeyUtil.java +++ /dev/null @@ -1,65 +0,0 @@ -/** - * (C) 2010-2022 Alibaba Group Holding Limited. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.alibaba.datax.plugin.writer.odpswriter.util; - -import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.common.util.IdAndKeyRollingUtil; -import com.alibaba.datax.common.util.MessageSource; -import com.alibaba.datax.plugin.writer.odpswriter.Key; -import com.alibaba.datax.plugin.writer.odpswriter.OdpsWriterErrorCode; - -import org.apache.commons.lang3.StringUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.Map; - -public class IdAndKeyUtil { - private static Logger LOG = LoggerFactory.getLogger(IdAndKeyUtil.class); - private static final MessageSource MESSAGE_SOURCE = MessageSource.loadResourceBundle(IdAndKeyUtil.class); - - public static Configuration parseAccessIdAndKey(Configuration originalConfig) { - String accessId = originalConfig.getString(Key.ACCESS_ID); - String accessKey = originalConfig.getString(Key.ACCESS_KEY); - - // 只要 accessId,accessKey 二者配置了一个,就理解为是用户本意是要直接手动配置其 accessid/accessKey - if (StringUtils.isNotBlank(accessId) || StringUtils.isNotBlank(accessKey)) { - LOG.info("Try to get accessId/accessKey from your config."); - //通过如下语句,进行检查是否确实配置了 - accessId = originalConfig.getNecessaryValue(Key.ACCESS_ID, OdpsWriterErrorCode.REQUIRED_VALUE); - accessKey = originalConfig.getNecessaryValue(Key.ACCESS_KEY, OdpsWriterErrorCode.REQUIRED_VALUE); - //检查完毕,返回即可 - return originalConfig; - } else { - Map envProp = System.getenv(); - return getAccessIdAndKeyFromEnv(originalConfig, envProp); - } - } - - private static Configuration getAccessIdAndKeyFromEnv(Configuration originalConfig, - Map envProp) { - // 如果获取到ak,在getAccessIdAndKeyFromEnv中已经设置到originalConfig了 - String accessKey = IdAndKeyRollingUtil.getAccessIdAndKeyFromEnv(originalConfig); - if (StringUtils.isBlank(accessKey)) { - // 无处获取(既没有配置在作业中,也没用在环境变量中) - throw DataXException.asDataXException(OdpsWriterErrorCode.GET_ID_KEY_FAIL, - MESSAGE_SOURCE.message("idandkeyutil.2")); - } - return originalConfig; - } -} diff --git a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/util/OdpsUtil.java b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/util/OdpsUtil.java index a663da85..a3a372af 100755 --- a/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/util/OdpsUtil.java +++ b/odpswriter/src/main/java/com/alibaba/datax/plugin/writer/odpswriter/util/OdpsUtil.java @@ -79,7 +79,6 @@ public class OdpsUtil { public static Odps initOdpsProject(Configuration originalConfig) { - String accountType = originalConfig.getString(Key.ACCOUNT_TYPE); String accessId = originalConfig.getString(Key.ACCESS_ID); String accessKey = originalConfig.getString(Key.ACCESS_KEY); @@ -88,15 +87,10 @@ public class OdpsUtil { String securityToken = originalConfig.getString(Key.SECURITY_TOKEN); Account account; - if (accountType.equalsIgnoreCase(Constant.DEFAULT_ACCOUNT_TYPE)) { - if (StringUtils.isNotBlank(securityToken)) { - account = new com.aliyun.odps.account.StsAccount(accessId, accessKey, securityToken); - } else { - account = new AliyunAccount(accessId, accessKey); - } + if (StringUtils.isNotBlank(securityToken)) { + account = new com.aliyun.odps.account.StsAccount(accessId, accessKey, securityToken); } else { - throw DataXException.asDataXException(OdpsWriterErrorCode.ACCOUNT_TYPE_ERROR, - MESSAGE_SOURCE.message("odpsutil.4", accountType)); + account = new AliyunAccount(accessId, accessKey); } Odps odps = new Odps(account); diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java index 34d1b3af..556e50ac 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/writer/util/OriginalConfPretreatmentUtil.java @@ -10,6 +10,7 @@ import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.sql.Connection; import java.util.ArrayList; import java.util.List; @@ -120,9 +121,15 @@ public final class OriginalConfPretreatmentUtil { } else { // 确保用户配置的 column 不重复 ListUtil.makeSureNoValueDuplicate(userConfiguredColumns, false); + Connection connection = null; + try { + connection = connectionFactory.getConnecttion(); + // 检查列是否都为数据库表中正确的列(通过执行一次 select column from table 进行判断) + DBUtil.getColumnMetaData(connection, oneTable,StringUtils.join(userConfiguredColumns, ",")); + } finally { + DBUtil.closeDBResources(null, connection); + } - // 检查列是否都为数据库表中正确的列(通过执行一次 select column from table 进行判断) - DBUtil.getColumnMetaData(connectionFactory.getConnecttion(), oneTable,StringUtils.join(userConfiguredColumns, ",")); } } } diff --git a/pom.xml b/pom.xml index bd1c7f3a..957c60ee 100644 --- a/pom.xml +++ b/pom.xml @@ -22,7 +22,7 @@ 3.3.2 1.10 1.2 - 2.0.19 + 2.0.23 16.0.1 3.7.2.1-SNAPSHOT diff --git a/starrockswriter/doc/starrockswriter.md b/starrockswriter/doc/starrockswriter.md index ba94e6af..6ebe3681 100644 --- a/starrockswriter/doc/starrockswriter.md +++ b/starrockswriter/doc/starrockswriter.md @@ -64,13 +64,13 @@ StarRocksWriter 插件实现了写入数据到 StarRocks 主库的目的表的 "column": ["k1", "k2", "v1", "v2"], "preSql": [], "postSql": [], - "connection": [ - { - "table": ["xxx"], - "jdbcUrl": "jdbc:mysql://172.28.17.100:9030/", - "selectedDatabase": "xxxx" - } - ], + "connection": [ + { + "table": ["xxx"], + "jdbcUrl": "jdbc:mysql://172.28.17.100:9030/", + "selectedDatabase": "xxxx" + } + ], "loadUrl": ["172.28.17.100:8030", "172.28.17.100:8030"], "loadProps": {} } diff --git a/starrockswriter/pom.xml b/starrockswriter/pom.xml index 67e3b919..73a51422 100755 --- a/starrockswriter/pom.xml +++ b/starrockswriter/pom.xml @@ -64,7 +64,6 @@ com.alibaba.fastjson2 fastjson2 - 2.0.23 mysql @@ -98,10 +97,6 @@ true - - com.alibaba.fastjson - com.starrocks.shade.com.alibaba.fastjson - org.apache.http com.starrocks.shade.org.apache.http @@ -118,7 +113,6 @@ commons-logging:* org.apache.httpcomponents:httpclient org.apache.httpcomponents:httpcore - com.alibaba:fastjson diff --git a/txtfilereader/src/main/java/com/alibaba/datax/plugin/reader/txtfilereader/TxtFileReader.java b/txtfilereader/src/main/java/com/alibaba/datax/plugin/reader/txtfilereader/TxtFileReader.java index 914305c6..a74ef8fc 100755 --- a/txtfilereader/src/main/java/com/alibaba/datax/plugin/reader/txtfilereader/TxtFileReader.java +++ b/txtfilereader/src/main/java/com/alibaba/datax/plugin/reader/txtfilereader/TxtFileReader.java @@ -182,6 +182,7 @@ public class TxtFileReader extends Reader { delimiterInStr)); } + UnstructuredStorageReaderUtil.validateCsvReaderConfig(this.originConfig); } @Override