From ad7a0cce6bb0f0f9f9c0f59d3107d8ee6298d1af Mon Sep 17 00:00:00 2001 From: caoliang <245623257@qq.com> Date: Thu, 11 Jan 2024 12:01:57 +0800 Subject: [PATCH 1/3] Supports importing bitmap and other types --- .../writer/selectdbwriter/CopySQLBuilder.java | 37 ++++++++++++++++--- 1 file changed, 31 insertions(+), 6 deletions(-) diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopySQLBuilder.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopySQLBuilder.java index 62910d5d..f37c6dbe 100644 --- a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopySQLBuilder.java +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/CopySQLBuilder.java @@ -6,6 +6,11 @@ import java.util.StringJoiner; public class CopySQLBuilder { private final static String COPY_SYNC = "copy.async"; + private final static String FIELD_DELIMITER_KEY = "file.column_separator"; + private final static String FIELD_DELIMITER_DEFAULT = "\t"; + private final static String LINE_DELIMITER_KEY = "file.line_delimiter"; + private final static String LINE_DELIMITER_DEFAULT = "\n"; + private final static String COLUMNS = "columns"; private final String fileName; private final Keys options; private Map properties; @@ -21,18 +26,38 @@ public class CopySQLBuilder { public String buildCopySQL(){ StringBuilder sb = new StringBuilder(); sb.append("COPY INTO ") - .append(options.getDatabase() + "." + options.getTable()) - .append(" FROM @~('").append(fileName).append("') ") - .append("PROPERTIES ("); + .append(options.getDatabase() + "." + options.getTable()); + + if (properties.get(COLUMNS) != null && !properties.get(COLUMNS).equals("")) { + sb.append(" FROM ( SELECT ").append(properties.get(COLUMNS)) + .append(" FROM @~('").append(fileName).append("') ) ") + .append("PROPERTIES ("); + } else { + sb.append(" FROM @~('").append(fileName).append("') ") + .append("PROPERTIES ("); + } //copy into must be sync properties.put(COPY_SYNC,false); StringJoiner props = new StringJoiner(","); for(Map.Entry entry : properties.entrySet()){ String key = String.valueOf(entry.getKey()); - String value = String.valueOf(entry.getValue()); - String prop = String.format("'%s'='%s'",key,value); - props.add(prop); + String value = ""; + switch (key){ + case FIELD_DELIMITER_KEY: + value = DelimiterParser.parse(String.valueOf(entry.getValue()),FIELD_DELIMITER_DEFAULT); + break; + case LINE_DELIMITER_KEY: + value = DelimiterParser.parse(String.valueOf(entry.getValue()),LINE_DELIMITER_DEFAULT); + break; + default: + value = String.valueOf(entry.getValue()); + break; + } + if(!key.equals(COLUMNS)){ + String prop = String.format("'%s'='%s'", key, value); + props.add(prop); + } } sb.append(props).append(" )"); return sb.toString(); From 329f7b28cf826ca1b8ba3826e5208345169611e0 Mon Sep 17 00:00:00 2001 From: caoliang <245623257@qq.com> Date: Thu, 11 Jan 2024 12:28:30 +0800 Subject: [PATCH 2/3] Supports importing bitmap and other types --- .../writer/selectdbwriter/SelectdbCopyIntoObserver.java | 4 ++-- .../datax/plugin/writer/selectdbwriter/SelectdbUtil.java | 5 +++++ 2 files changed, 7 insertions(+), 2 deletions(-) diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java index c9228b22..bbc52e9e 100644 --- a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java @@ -40,7 +40,7 @@ public class SelectdbCopyIntoObserver { private CloseableHttpClient httpClient; private static final String UPLOAD_URL_PATTERN = "%s/copy/upload"; private static final String COMMIT_PATTERN = "%s/copy/query"; - private static final Pattern COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied, matched (\\d+) files, " + "filtered (\\d+) files because files may be loading or loaded"); + private static final Pattern COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied.*"); public SelectdbCopyIntoObserver(Keys options) { @@ -202,7 +202,7 @@ public class SelectdbCopyIntoObserver { return false; }else{ Map result = dataResp.getResult(); - if(!result.get("state").equals("FINISHED") && !isCommitted(result.get("msg"))){ + if(SelectdbUtil.isNullOrEmpty(result) || !result.get("state").equals("FINISHED") && !isCommitted(result.get("msg"))){ LOG.error("copy into load failed, reason:{}", loadResult); return false; }else{ diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbUtil.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbUtil.java index 6cfcc8bf..3e7961e8 100644 --- a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbUtil.java +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbUtil.java @@ -15,6 +15,7 @@ import java.sql.Statement; import java.util.ArrayList; import java.util.Collections; import java.util.List; +import java.util.Map; /** * jdbc util @@ -110,4 +111,8 @@ public class SelectdbUtil { return reference; } } + + public static boolean isNullOrEmpty(Map map) { + return map == null || map.isEmpty(); + } } From 4e6bbfa1ef4e7b6b83a489108b96fe95785293fb Mon Sep 17 00:00:00 2001 From: caoliang <245623257@qq.com> Date: Thu, 29 Feb 2024 14:37:06 +0800 Subject: [PATCH 3/3] Optimize exception error messages --- .../writer/selectdbwriter/SelectdbCopyIntoObserver.java | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java index bbc52e9e..a05d5ca2 100644 --- a/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java +++ b/selectdbwriter/src/main/java/com/alibaba/datax/plugin/writer/selectdbwriter/SelectdbCopyIntoObserver.java @@ -188,15 +188,18 @@ public class SelectdbCopyIntoObserver { if(success){ LOG.info("commit success cost {}ms, response is {}", System.currentTimeMillis() - start, loadResult); }else{ - throw new SelectdbWriterException("commit fail",true); + LOG.error("commit error with status {}, reason {}, response {}", statusCode, reasonPhrase, loadResult); + String copyErrMsg = String.format("commit error, status: %d, reason: %s, response: %s, copySQL: %s", + statusCode, reasonPhrase, loadResult, copySQL); + throw new SelectdbWriterException(copyErrMsg,true); } } } public boolean handleCommitResponse(String loadResult) throws IOException { - BaseResponse baseResponse = OBJECT_MAPPER.readValue(loadResult, new TypeReference>(){}); + BaseResponse baseResponse = OBJECT_MAPPER.readValue(loadResult, new TypeReference(){}); if(baseResponse.getCode() == SUCCESS){ - CopyIntoResp dataResp = baseResponse.getData(); + CopyIntoResp dataResp = OBJECT_MAPPER.convertValue(baseResponse.getData(), CopyIntoResp.class); if(FAIL.equals(dataResp.getDataCode())){ LOG.error("copy into execute failed, reason:{}", loadResult); return false;