mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 04:59:51 +08:00
Merge 4e6bbfa1ef
into 0824b45c5e
This commit is contained in:
commit
f808f88c51
@ -6,6 +6,11 @@ import java.util.StringJoiner;
|
||||
|
||||
public class CopySQLBuilder {
|
||||
private final static String COPY_SYNC = "copy.async";
|
||||
private final static String FIELD_DELIMITER_KEY = "file.column_separator";
|
||||
private final static String FIELD_DELIMITER_DEFAULT = "\t";
|
||||
private final static String LINE_DELIMITER_KEY = "file.line_delimiter";
|
||||
private final static String LINE_DELIMITER_DEFAULT = "\n";
|
||||
private final static String COLUMNS = "columns";
|
||||
private final String fileName;
|
||||
private final Keys options;
|
||||
private Map<String, Object> properties;
|
||||
@ -21,18 +26,38 @@ public class CopySQLBuilder {
|
||||
public String buildCopySQL(){
|
||||
StringBuilder sb = new StringBuilder();
|
||||
sb.append("COPY INTO ")
|
||||
.append(options.getDatabase() + "." + options.getTable())
|
||||
.append(" FROM @~('").append(fileName).append("') ")
|
||||
.append("PROPERTIES (");
|
||||
.append(options.getDatabase() + "." + options.getTable());
|
||||
|
||||
if (properties.get(COLUMNS) != null && !properties.get(COLUMNS).equals("")) {
|
||||
sb.append(" FROM ( SELECT ").append(properties.get(COLUMNS))
|
||||
.append(" FROM @~('").append(fileName).append("') ) ")
|
||||
.append("PROPERTIES (");
|
||||
} else {
|
||||
sb.append(" FROM @~('").append(fileName).append("') ")
|
||||
.append("PROPERTIES (");
|
||||
}
|
||||
|
||||
//copy into must be sync
|
||||
properties.put(COPY_SYNC,false);
|
||||
StringJoiner props = new StringJoiner(",");
|
||||
for(Map.Entry<String,Object> entry : properties.entrySet()){
|
||||
String key = String.valueOf(entry.getKey());
|
||||
String value = String.valueOf(entry.getValue());
|
||||
String prop = String.format("'%s'='%s'",key,value);
|
||||
props.add(prop);
|
||||
String value = "";
|
||||
switch (key){
|
||||
case FIELD_DELIMITER_KEY:
|
||||
value = DelimiterParser.parse(String.valueOf(entry.getValue()),FIELD_DELIMITER_DEFAULT);
|
||||
break;
|
||||
case LINE_DELIMITER_KEY:
|
||||
value = DelimiterParser.parse(String.valueOf(entry.getValue()),LINE_DELIMITER_DEFAULT);
|
||||
break;
|
||||
default:
|
||||
value = String.valueOf(entry.getValue());
|
||||
break;
|
||||
}
|
||||
if(!key.equals(COLUMNS)){
|
||||
String prop = String.format("'%s'='%s'", key, value);
|
||||
props.add(prop);
|
||||
}
|
||||
}
|
||||
sb.append(props).append(" )");
|
||||
return sb.toString();
|
||||
|
@ -40,7 +40,7 @@ public class SelectdbCopyIntoObserver {
|
||||
private CloseableHttpClient httpClient;
|
||||
private static final String UPLOAD_URL_PATTERN = "%s/copy/upload";
|
||||
private static final String COMMIT_PATTERN = "%s/copy/query";
|
||||
private static final Pattern COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied, matched (\\d+) files, " + "filtered (\\d+) files because files may be loading or loaded");
|
||||
private static final Pattern COMMITTED_PATTERN = Pattern.compile("errCode = 2, detailMessage = No files can be copied.*");
|
||||
|
||||
|
||||
public SelectdbCopyIntoObserver(Keys options) {
|
||||
@ -188,21 +188,24 @@ public class SelectdbCopyIntoObserver {
|
||||
if(success){
|
||||
LOG.info("commit success cost {}ms, response is {}", System.currentTimeMillis() - start, loadResult);
|
||||
}else{
|
||||
throw new SelectdbWriterException("commit fail",true);
|
||||
LOG.error("commit error with status {}, reason {}, response {}", statusCode, reasonPhrase, loadResult);
|
||||
String copyErrMsg = String.format("commit error, status: %d, reason: %s, response: %s, copySQL: %s",
|
||||
statusCode, reasonPhrase, loadResult, copySQL);
|
||||
throw new SelectdbWriterException(copyErrMsg,true);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public boolean handleCommitResponse(String loadResult) throws IOException {
|
||||
BaseResponse<CopyIntoResp> baseResponse = OBJECT_MAPPER.readValue(loadResult, new TypeReference<BaseResponse<CopyIntoResp>>(){});
|
||||
BaseResponse baseResponse = OBJECT_MAPPER.readValue(loadResult, new TypeReference<BaseResponse>(){});
|
||||
if(baseResponse.getCode() == SUCCESS){
|
||||
CopyIntoResp dataResp = baseResponse.getData();
|
||||
CopyIntoResp dataResp = OBJECT_MAPPER.convertValue(baseResponse.getData(), CopyIntoResp.class);
|
||||
if(FAIL.equals(dataResp.getDataCode())){
|
||||
LOG.error("copy into execute failed, reason:{}", loadResult);
|
||||
return false;
|
||||
}else{
|
||||
Map<String, String> result = dataResp.getResult();
|
||||
if(!result.get("state").equals("FINISHED") && !isCommitted(result.get("msg"))){
|
||||
if(SelectdbUtil.isNullOrEmpty(result) || !result.get("state").equals("FINISHED") && !isCommitted(result.get("msg"))){
|
||||
LOG.error("copy into load failed, reason:{}", loadResult);
|
||||
return false;
|
||||
}else{
|
||||
|
@ -15,6 +15,7 @@ import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
/**
|
||||
* jdbc util
|
||||
@ -110,4 +111,8 @@ public class SelectdbUtil {
|
||||
return reference;
|
||||
}
|
||||
}
|
||||
|
||||
public static boolean isNullOrEmpty(Map<?, ?> map) {
|
||||
return map == null || map.isEmpty();
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user