From 809cffc5f64798266cb18249382ebd74aa788bd3 Mon Sep 17 00:00:00 2001 From: daizihao Date: Wed, 14 Oct 2020 20:35:19 +0800 Subject: [PATCH 1/6] hdfs writer adds overwirte mode --- .../alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java index 0119be2b..07ad8091 100644 --- a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java +++ b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java @@ -81,10 +81,10 @@ public class HdfsWriter extends Writer { //writeMode check this.writeMode = this.writerSliceConfig.getNecessaryValue(Key.WRITE_MODE, HdfsWriterErrorCode.REQUIRED_VALUE); writeMode = writeMode.toLowerCase().trim(); - Set supportedWriteModes = Sets.newHashSet("append", "nonconflict"); + Set supportedWriteModes = Sets.newHashSet("append", "nonconflict", "overwrite"); if (!supportedWriteModes.contains(writeMode)) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, - String.format("仅支持append, nonConflict两种模式, 不支持您配置的 writeMode 模式 : [%s]", + String.format("仅支持append, nonConflict, overwrite三种模式, 不支持您配置的 writeMode 模式 : [%s]", writeMode)); } this.writerSliceConfig.set(Key.WRITE_MODE, writeMode); @@ -179,6 +179,9 @@ public class HdfsWriter extends Writer { LOG.error(String.format("冲突文件列表为: [%s]", StringUtils.join(allFiles, ","))); throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("由于您配置了writeMode nonConflict,但您配置的path: [%s] 目录不为空, 下面存在其他文件或文件夹.", path)); + }else if ("overwrite".equalsIgnoreCase(writeMode) && isExistFile) { + LOG.info(String.format("由于您配置了writeMode overwrite, [%s] 下面的内容将被覆盖重写", path)); + hdfsHelper.deleteFiles(existFilePaths); } }else{ throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, From b4a3eeb130a0f21656569942ec0514019f453fae Mon Sep 17 00:00:00 2001 From: daizihao Date: Wed, 14 Oct 2020 20:39:25 +0800 Subject: [PATCH 2/6] support for "splitFactor" parameterization --- .../com/alibaba/datax/plugin/rdbms/reader/Constant.java | 2 ++ .../java/com/alibaba/datax/plugin/rdbms/reader/Key.java | 1 + .../datax/plugin/rdbms/reader/util/ReaderSplitUtil.java | 7 ++++++- 3 files changed, 9 insertions(+), 1 deletion(-) diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Constant.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Constant.java index 729d71ac..f998357e 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Constant.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Constant.java @@ -25,4 +25,6 @@ public final class Constant { public static String TABLE_NAME_PLACEHOLDER = "@table"; + public static Integer SPLIT_FACTOR = 5; + } diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java index 63f8dde0..0e10c742 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/Key.java @@ -46,5 +46,6 @@ public final class Key { public final static String DRYRUN = "dryRun"; + public static String SPLIT_FACTOR = "splitFactor"; } \ No newline at end of file diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java index 64109e90..b1be9143 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java @@ -68,7 +68,12 @@ public final class ReaderSplitUtil { //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不应该加1导致长尾 //考虑其他比率数字?(splitPk is null, 忽略此长尾) - eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5; + //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5; + + //为避免导入hive小文件 默认基数为5,也就是channel配置几个就是几个task,可以通过 pkQuota 配置基数 + // 最终task数为(channel/tableNum)向上取整*pkQuota + Integer quota = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR); + eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * quota; } // 尝试对每个表,切分为eachTableShouldSplittedNumber 份 for (String table : tables) { From 46a5e90fd93bbfcb3d16f90fab9a259e9810f08a Mon Sep 17 00:00:00 2001 From: daizihao Date: Thu, 15 Oct 2020 09:25:07 +0800 Subject: [PATCH 3/6] support for "splitFactor" parameterization --- .../datax/plugin/rdbms/reader/util/ReaderSplitUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java index b1be9143..0c140e92 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java @@ -70,8 +70,8 @@ public final class ReaderSplitUtil { //考虑其他比率数字?(splitPk is null, 忽略此长尾) //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5; - //为避免导入hive小文件 默认基数为5,也就是channel配置几个就是几个task,可以通过 pkQuota 配置基数 - // 最终task数为(channel/tableNum)向上取整*pkQuota + //为避免导入hive小文件 默认基数为5,也就是channel配置几个就是几个task,可以通过 splitFactor 配置基数 + // 最终task数为(channel/tableNum)向上取整*splitFactor Integer quota = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR); eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * quota; } From dabb9a2fc341ad06dfcc98e0ab5c4ce3074e31a7 Mon Sep 17 00:00:00 2001 From: daizihao Date: Thu, 15 Oct 2020 09:26:19 +0800 Subject: [PATCH 4/6] support for "splitFactor" parameterization --- .../datax/plugin/rdbms/reader/util/ReaderSplitUtil.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java index 0c140e92..63c1ba05 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java @@ -72,8 +72,8 @@ public final class ReaderSplitUtil { //为避免导入hive小文件 默认基数为5,也就是channel配置几个就是几个task,可以通过 splitFactor 配置基数 // 最终task数为(channel/tableNum)向上取整*splitFactor - Integer quota = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR); - eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * quota; + Integer splitFactor = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR); + eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * splitFactor; } // 尝试对每个表,切分为eachTableShouldSplittedNumber 份 for (String table : tables) { From 3fb7fe7451970a9fb7431478e7dba4e870d01777 Mon Sep 17 00:00:00 2001 From: daizihao Date: Tue, 17 Nov 2020 19:12:24 +0800 Subject: [PATCH 5/6] support for "splitFactor" parameterization --- .../alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java index 63c1ba05..ed48ff8c 100755 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java +++ b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/reader/util/ReaderSplitUtil.java @@ -70,7 +70,7 @@ public final class ReaderSplitUtil { //考虑其他比率数字?(splitPk is null, 忽略此长尾) //eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5; - //为避免导入hive小文件 默认基数为5,也就是channel配置几个就是几个task,可以通过 splitFactor 配置基数 + //为避免导入hive小文件 默认基数为5,可以通过 splitFactor 配置基数 // 最终task数为(channel/tableNum)向上取整*splitFactor Integer splitFactor = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR); eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * splitFactor; From bcf800e7bf34ac31d8274c1ef78ad1632e7497e1 Mon Sep 17 00:00:00 2001 From: daizihao Date: Fri, 11 Dec 2020 17:31:47 +0800 Subject: [PATCH 6/6] =?UTF-8?q?=E4=BF=AE=E6=94=B9overwrite=E4=B8=BAtruncat?= =?UTF-8?q?e?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../datax/plugin/writer/hdfswriter/HdfsWriter.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java index 07ad8091..853613a2 100644 --- a/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java +++ b/hdfswriter/src/main/java/com/alibaba/datax/plugin/writer/hdfswriter/HdfsWriter.java @@ -81,10 +81,10 @@ public class HdfsWriter extends Writer { //writeMode check this.writeMode = this.writerSliceConfig.getNecessaryValue(Key.WRITE_MODE, HdfsWriterErrorCode.REQUIRED_VALUE); writeMode = writeMode.toLowerCase().trim(); - Set supportedWriteModes = Sets.newHashSet("append", "nonconflict", "overwrite"); + Set supportedWriteModes = Sets.newHashSet("append", "nonconflict", "truncate"); if (!supportedWriteModes.contains(writeMode)) { throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, - String.format("仅支持append, nonConflict, overwrite三种模式, 不支持您配置的 writeMode 模式 : [%s]", + String.format("仅支持append, nonConflict, truncate三种模式, 不支持您配置的 writeMode 模式 : [%s]", writeMode)); } this.writerSliceConfig.set(Key.WRITE_MODE, writeMode); @@ -179,8 +179,8 @@ public class HdfsWriter extends Writer { LOG.error(String.format("冲突文件列表为: [%s]", StringUtils.join(allFiles, ","))); throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, String.format("由于您配置了writeMode nonConflict,但您配置的path: [%s] 目录不为空, 下面存在其他文件或文件夹.", path)); - }else if ("overwrite".equalsIgnoreCase(writeMode) && isExistFile) { - LOG.info(String.format("由于您配置了writeMode overwrite, [%s] 下面的内容将被覆盖重写", path)); + }else if ("truncate".equalsIgnoreCase(writeMode) && isExistFile) { + LOG.info(String.format("由于您配置了writeMode truncate, [%s] 下面的内容将被覆盖重写", path)); hdfsHelper.deleteFiles(existFilePaths); } }else{