mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 17:22:15 +08:00
Merge pull request #889 from Mr-KIDBK/dev
hdfs增加overwrite模式,rdbms增加单表切分参数
This commit is contained in:
commit
42e50f626f
@ -81,10 +81,10 @@ public class HdfsWriter extends Writer {
|
|||||||
//writeMode check
|
//writeMode check
|
||||||
this.writeMode = this.writerSliceConfig.getNecessaryValue(Key.WRITE_MODE, HdfsWriterErrorCode.REQUIRED_VALUE);
|
this.writeMode = this.writerSliceConfig.getNecessaryValue(Key.WRITE_MODE, HdfsWriterErrorCode.REQUIRED_VALUE);
|
||||||
writeMode = writeMode.toLowerCase().trim();
|
writeMode = writeMode.toLowerCase().trim();
|
||||||
Set<String> supportedWriteModes = Sets.newHashSet("append", "nonconflict");
|
Set<String> supportedWriteModes = Sets.newHashSet("append", "nonconflict", "truncate");
|
||||||
if (!supportedWriteModes.contains(writeMode)) {
|
if (!supportedWriteModes.contains(writeMode)) {
|
||||||
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
|
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
|
||||||
String.format("仅支持append, nonConflict两种模式, 不支持您配置的 writeMode 模式 : [%s]",
|
String.format("仅支持append, nonConflict, truncate三种模式, 不支持您配置的 writeMode 模式 : [%s]",
|
||||||
writeMode));
|
writeMode));
|
||||||
}
|
}
|
||||||
this.writerSliceConfig.set(Key.WRITE_MODE, writeMode);
|
this.writerSliceConfig.set(Key.WRITE_MODE, writeMode);
|
||||||
@ -179,6 +179,9 @@ public class HdfsWriter extends Writer {
|
|||||||
LOG.error(String.format("冲突文件列表为: [%s]", StringUtils.join(allFiles, ",")));
|
LOG.error(String.format("冲突文件列表为: [%s]", StringUtils.join(allFiles, ",")));
|
||||||
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
|
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
|
||||||
String.format("由于您配置了writeMode nonConflict,但您配置的path: [%s] 目录不为空, 下面存在其他文件或文件夹.", path));
|
String.format("由于您配置了writeMode nonConflict,但您配置的path: [%s] 目录不为空, 下面存在其他文件或文件夹.", path));
|
||||||
|
}else if ("truncate".equalsIgnoreCase(writeMode) && isExistFile) {
|
||||||
|
LOG.info(String.format("由于您配置了writeMode truncate, [%s] 下面的内容将被覆盖重写", path));
|
||||||
|
hdfsHelper.deleteFiles(existFilePaths);
|
||||||
}
|
}
|
||||||
}else{
|
}else{
|
||||||
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
|
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
|
||||||
|
@ -25,4 +25,6 @@ public final class Constant {
|
|||||||
|
|
||||||
public static String TABLE_NAME_PLACEHOLDER = "@table";
|
public static String TABLE_NAME_PLACEHOLDER = "@table";
|
||||||
|
|
||||||
|
public static Integer SPLIT_FACTOR = 5;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -46,5 +46,6 @@ public final class Key {
|
|||||||
|
|
||||||
public final static String DRYRUN = "dryRun";
|
public final static String DRYRUN = "dryRun";
|
||||||
|
|
||||||
|
public static String SPLIT_FACTOR = "splitFactor";
|
||||||
|
|
||||||
}
|
}
|
@ -68,7 +68,12 @@ public final class ReaderSplitUtil {
|
|||||||
//eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不应该加1导致长尾
|
//eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 2 + 1;// 不应该加1导致长尾
|
||||||
|
|
||||||
//考虑其他比率数字?(splitPk is null, 忽略此长尾)
|
//考虑其他比率数字?(splitPk is null, 忽略此长尾)
|
||||||
eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5;
|
//eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * 5;
|
||||||
|
|
||||||
|
//为避免导入hive小文件 默认基数为5,可以通过 splitFactor 配置基数
|
||||||
|
// 最终task数为(channel/tableNum)向上取整*splitFactor
|
||||||
|
Integer splitFactor = originalSliceConfig.getInt(Key.SPLIT_FACTOR, Constant.SPLIT_FACTOR);
|
||||||
|
eachTableShouldSplittedNumber = eachTableShouldSplittedNumber * splitFactor;
|
||||||
}
|
}
|
||||||
// 尝试对每个表,切分为eachTableShouldSplittedNumber 份
|
// 尝试对每个表,切分为eachTableShouldSplittedNumber 份
|
||||||
for (String table : tables) {
|
for (String table : tables) {
|
||||||
|
Loading…
Reference in New Issue
Block a user