diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopySQLBuilder.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopySQLBuilder.java index 62910d5d..f37c6dbe 100644 --- a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopySQLBuilder.java +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopySQLBuilder.java @@ -6,6 +6,11 @@ import java.util.StringJoiner; public class CopySQLBuilder { private final static String COPY_SYNC = "copy.async"; + private final static String FIELD_DELIMITER_KEY = "file.column_separator"; + private final static String FIELD_DELIMITER_DEFAULT = "\t"; + private final static String LINE_DELIMITER_KEY = "file.line_delimiter"; + private final static String LINE_DELIMITER_DEFAULT = "\n"; + private final static String COLUMNS = "columns"; private final String fileName; private final Keys options; private Map properties; @@ -21,18 +26,38 @@ public class CopySQLBuilder { public String buildCopySQL(){ StringBuilder sb = new StringBuilder(); sb.append("COPY INTO ") - .append(options.getDatabase() + "." + options.getTable()) - .append(" FROM @~('").append(fileName).append("') ") - .append("PROPERTIES ("); + .append(options.getDatabase() + "." + options.getTable()); + + if (properties.get(COLUMNS) != null && !properties.get(COLUMNS).equals("")) { + sb.append(" FROM ( SELECT ").append(properties.get(COLUMNS)) + .append(" FROM @~('").append(fileName).append("') ) ") + .append("PROPERTIES ("); + } else { + sb.append(" FROM @~('").append(fileName).append("') ") + .append("PROPERTIES ("); + } //copy into must be sync properties.put(COPY_SYNC,false); StringJoiner props = new StringJoiner(","); for(Map.Entry entry : properties.entrySet()){ String key = String.valueOf(entry.getKey()); - String value = String.valueOf(entry.getValue()); - String prop = String.format("'%s'='%s'",key,value); - props.add(prop); + String value = ""; + switch (key){ + case FIELD_DELIMITER_KEY: + value = DelimiterParser.parse(String.valueOf(entry.getValue()),FIELD_DELIMITER_DEFAULT); + break; + case LINE_DELIMITER_KEY: + value = DelimiterParser.parse(String.valueOf(entry.getValue()),LINE_DELIMITER_DEFAULT); + break; + default: + value = String.valueOf(entry.getValue()); + break; + } + if(!key.equals(COLUMNS)){ + String prop = String.format("'%s'='%s'", key, value); + props.add(prop); + } } sb.append(props).append(" )"); return sb.toString(); diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java index c9228b22..a05d5ca2 100644 --- a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java @@ -40,7 +40,7 @@ public class SelectdbCopyIntoObserver { private CloseableHttpClient httpClient; private static final String UPLOAD_URL_PATTERN = "%s/copy/upload"; private static final String COMMIT_PATTERN = "%s/copy/query"; - private static final Pattern COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied, matched (\\d+) files, " + "filtered (\\d+) files because files may be loading or loaded"); + private static final Pattern COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied.*"); public SelectdbCopyIntoObserver(Keys options) { @@ -188,21 +188,24 @@ public class SelectdbCopyIntoObserver { if(success){ LOG.info("commit success cost {}ms, response is {}", System.currentTimeMillis() - start, loadResult); }else{ - throw new SelectdbWriterException("commit fail",true); + LOG.error("commit error with status {}, reason {}, response {}", statusCode, reasonPhrase, loadResult); + String copyErrMsg = String.format("commit error, status: %d, reason: %s, response: %s, copySQL: %s", + statusCode, reasonPhrase, loadResult, copySQL); + throw new SelectdbWriterException(copyErrMsg,true); } } } public boolean handleCommitResponse(String loadResult) throws IOException { - BaseResponse baseResponse = OBJECT_MAPPER.readValue(loadResult, new TypeReference>(){}); + BaseResponse baseResponse = OBJECT_MAPPER.readValue(loadResult, new TypeReference(){}); if(baseResponse.getCode() == SUCCESS){ - CopyIntoResp dataResp = baseResponse.getData(); + CopyIntoResp dataResp = OBJECT_MAPPER.convertValue(baseResponse.getData(), CopyIntoResp.class); if(FAIL.equals(dataResp.getDataCode())){ LOG.error("copy into execute failed, reason:{}", loadResult); return false; }else{ Map result = dataResp.getResult(); - if(!result.get("state").equals("FINISHED") && !isCommitted(result.get("msg"))){ + if(SelectdbUtil.isNullOrEmpty(result) || !result.get("state").equals("FINISHED") && !isCommitted(result.get("msg"))){ LOG.error("copy into load failed, reason:{}", loadResult); return false; }else{ diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbUtil.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbUtil.java index 6cfcc8bf..3e7961e8 100644 --- a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbUtil.java +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbUtil.java @@ -15,6 +15,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; /** * jdbc util @@ -110,4 +111,8 @@ public class SelectdbUtil { return reference; } } + + public static boolean isNullOrEmpty(Map map) { + return map == null || map.isEmpty(); + } }