hdfs writer adds overwirte mode

This commit is contained in:
daizihao 2020-10-14 20:35:19 +08:00
parent 3fd9c0a79f
commit 809cffc5f6

View File

@ -81,10 +81,10 @@ public class HdfsWriter extends Writer {
//writeMode check //writeMode check
this.writeMode = this.writerSliceConfig.getNecessaryValue(Key.WRITE_MODE, HdfsWriterErrorCode.REQUIRED_VALUE); this.writeMode = this.writerSliceConfig.getNecessaryValue(Key.WRITE_MODE, HdfsWriterErrorCode.REQUIRED_VALUE);
writeMode = writeMode.toLowerCase().trim(); writeMode = writeMode.toLowerCase().trim();
Set<String> supportedWriteModes = Sets.newHashSet("append", "nonconflict"); Set<String> supportedWriteModes = Sets.newHashSet("append", "nonconflict", "overwrite");
if (!supportedWriteModes.contains(writeMode)) { if (!supportedWriteModes.contains(writeMode)) {
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("仅支持append, nonConflict种模式, 不支持您配置的 writeMode 模式 : [%s]", String.format("仅支持append, nonConflict, overwrite三种模式, 不支持您配置的 writeMode 模式 : [%s]",
writeMode)); writeMode));
} }
this.writerSliceConfig.set(Key.WRITE_MODE, writeMode); this.writerSliceConfig.set(Key.WRITE_MODE, writeMode);
@ -179,6 +179,9 @@ public class HdfsWriter extends Writer {
LOG.error(String.format("冲突文件列表为: [%s]", StringUtils.join(allFiles, ","))); LOG.error(String.format("冲突文件列表为: [%s]", StringUtils.join(allFiles, ",")));
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,
String.format("由于您配置了writeMode nonConflict,但您配置的path: [%s] 目录不为空, 下面存在其他文件或文件夹.", path)); String.format("由于您配置了writeMode nonConflict,但您配置的path: [%s] 目录不为空, 下面存在其他文件或文件夹.", path));
}else if ("overwrite".equalsIgnoreCase(writeMode) && isExistFile) {
LOG.info(String.format("由于您配置了writeMode overwrite, [%s] 下面的内容将被覆盖重写", path));
hdfsHelper.deleteFiles(existFilePaths);
} }
}else{ }else{
throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE, throw DataXException.asDataXException(HdfsWriterErrorCode.ILLEGAL_VALUE,