diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java index 05cef7a4..d88581ec 100755 --- a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriter.java @@ -9,6 +9,8 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtil; import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; import com.alibaba.datax.plugin.rdbms.util.DataBaseType; import com.dorisdb.connector.datax.plugin.writer.doriswriter.manager.DorisWriterManager; +import com.dorisdb.connector.datax.plugin.writer.doriswriter.row.DorisISerializer; +import com.dorisdb.connector.datax.plugin.writer.doriswriter.row.DorisSerializerFactory; import com.dorisdb.connector.datax.plugin.writer.doriswriter.util.DorisWriterUtil; import org.slf4j.Logger; @@ -86,11 +88,13 @@ public class DorisWriter extends Writer { public static class Task extends Writer.Task { private DorisWriterManager writerManager; private DorisWriterOptions options; + private DorisISerializer rowSerializer; @Override public void init() { options = new DorisWriterOptions(super.getPluginJobConf()); writerManager = new DorisWriterManager(options); + rowSerializer = DorisSerializerFactory.createSerializer(options); } @Override @@ -110,15 +114,7 @@ public class DorisWriter extends Writer { record.getColumnNumber(), options.getColumns().size())); } - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < record.getColumnNumber(); i++) { - Object value = record.getColumn(i).getRawData(); - sb.append(null == value ? "\\N" : value); - if (i < record.getColumnNumber() - 1) { - sb.append("\t"); - } - } - writerManager.writeRecord(sb.toString()); + writerManager.writeRecord(rowSerializer.serialize(record)); } } catch (Exception e) { throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java index 1d9cb2be..b4dbb0a3 100644 --- a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/DorisWriterOptions.java @@ -7,6 +7,7 @@ import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; import java.util.List; +import java.util.Map; public class DorisWriterOptions implements Serializable { @@ -17,6 +18,11 @@ public class DorisWriterOptions implements Serializable { private static final int BATCH_ROWS = 500000; private static final long BATCH_BYTES = 100 * MEGA_BYTES_SCALE; + private static final String KEY_LOAD_PROPS_FORMAT = "format"; + public enum StreamLoadFormat { + CSV, JSON; + } + private static final String KEY_USERNAME = "username"; private static final String KEY_PASSWORD = "password"; private static final String KEY_DATABASE = "database"; @@ -26,6 +32,7 @@ public class DorisWriterOptions implements Serializable { private static final String KEY_POST_SQL = "postSql"; private static final String KEY_JDBC_URL = "jdbcUrl"; private static final String KEY_LOAD_URL = "loadUrl"; + private static final String KEY_LOAD_PROPS = "loadProps"; private final Configuration options; @@ -74,6 +81,10 @@ public class DorisWriterOptions implements Serializable { return options.getList(KEY_POST_SQL, String.class); } + public Map getLoadProps() { + return options.getMap(KEY_LOAD_PROPS); + } + public int getMaxRetries() { return MAX_RETRIES; } @@ -86,6 +97,18 @@ public class DorisWriterOptions implements Serializable { return BATCH_BYTES; } + public StreamLoadFormat getStreamLoadFormat() { + Map loadProps = getLoadProps(); + if (null == loadProps) { + return StreamLoadFormat.CSV; + } + if (loadProps.containsKey(KEY_LOAD_PROPS_FORMAT) + && StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(KEY_LOAD_PROPS_FORMAT)))) { + return StreamLoadFormat.JSON; + } + return StreamLoadFormat.CSV; + } + private void validateStreamLoadUrl() { List urlList = getLoadUrlList(); for (String host : urlList) { @@ -102,6 +125,7 @@ public class DorisWriterOptions implements Serializable { KEY_PASSWORD, KEY_DATABASE, KEY_TABLE, + KEY_COLUMN, KEY_LOAD_URL }; for (String optionKey : requiredOptionKeys) { diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java index 90e4fdbb..68621d8f 100644 --- a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/manager/DorisStreamLoadVisitor.java @@ -91,7 +91,13 @@ public class DorisStreamLoadVisitor { } private byte[] joinRows(List rows) { - return String.join("\n", rows).getBytes(StandardCharsets.UTF_8); + if (DorisWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) { + return String.join("\n", rows).getBytes(StandardCharsets.UTF_8); + } + if (DorisWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) { + return new StringBuilder("[").append(String.join(",", rows)).append("]").toString().getBytes(StandardCharsets.UTF_8); + } + throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:"); } @SuppressWarnings("unchecked") @@ -110,6 +116,11 @@ public class DorisStreamLoadVisitor { if (null != cols && !cols.isEmpty()) { httpPut.setHeader("columns", String.join(",", cols)); } + if (null != writerOptions.getLoadProps()) { + for (Map.Entry entry : writerOptions.getLoadProps().entrySet()) { + httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); + } + } httpPut.setHeader("Expect", "100-continue"); httpPut.setHeader("label", label); httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded"); diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisCsvSerializer.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisCsvSerializer.java new file mode 100644 index 00000000..7a22b212 --- /dev/null +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisCsvSerializer.java @@ -0,0 +1,79 @@ +package com.dorisdb.connector.datax.plugin.writer.doriswriter.row; + +import java.io.StringWriter; + +import com.alibaba.datax.common.element.Record; + +import com.google.common.base.Strings; + +public class DorisCsvSerializer implements DorisISerializer { + + private static final long serialVersionUID = 1L; + + private final String HEX_STRING = "0123456789ABCDEF"; + + private final String columnSeparator; + + public DorisCsvSerializer(String sp) { + this.columnSeparator = parseByteSeparator(sp); + } + + @Override + public String serialize(Record row) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < row.getColumnNumber(); i++) { + Object value = row.getColumn(i).getRawData(); + sb.append(null == value ? "\\N" : value); + if (i < row.getColumnNumber() - 1) { + sb.append(columnSeparator); + } + } + return sb.toString(); + } + + private String parseByteSeparator(String sp) { + if (Strings.isNullOrEmpty(sp)) { + // `\t` by default + return "\t"; + } + if (!sp.toUpperCase().startsWith("\\X")) { + return sp; + } + String hexStr = sp.substring(2); + // check hex str + if (hexStr.isEmpty()) { + throw new RuntimeException("Failed to parse column_separator: `Hex str is empty`"); + } + if (hexStr.length() % 2 != 0) { + throw new RuntimeException("Failed to parse column_separator: `Hex str length error`"); + } + for (char hexChar : hexStr.toUpperCase().toCharArray()) { + if (HEX_STRING.indexOf(hexChar) == -1) { + throw new RuntimeException("Failed to parse column_separator: `Hex str format error`"); + } + } + // transform to separator + StringWriter writer = new StringWriter(); + for (byte b : hexStrToBytes(hexStr)) { + writer.append((char) b); + } + return writer.toString(); + } + + private byte[] hexStrToBytes(String hexStr) { + String upperHexStr = hexStr.toUpperCase(); + int length = upperHexStr.length() / 2; + char[] hexChars = upperHexStr.toCharArray(); + byte[] bytes = new byte[length]; + for (int i = 0; i < length; i++) { + int pos = i * 2; + bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1])); + } + return bytes; + } + + private byte charToByte(char c) { + return (byte) HEX_STRING.indexOf(c); + } + +} diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisISerializer.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisISerializer.java new file mode 100644 index 00000000..92a73288 --- /dev/null +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisISerializer.java @@ -0,0 +1,11 @@ +package com.dorisdb.connector.datax.plugin.writer.doriswriter.row; + +import java.io.Serializable; + +import com.alibaba.datax.common.element.Record; + +public interface DorisISerializer extends Serializable { + + String serialize(Record row); + +} diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisJsonSerializer.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisJsonSerializer.java new file mode 100644 index 00000000..5da3e9ae --- /dev/null +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisJsonSerializer.java @@ -0,0 +1,33 @@ +package com.dorisdb.connector.datax.plugin.writer.doriswriter.row; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.fastjson.JSON; + +public class DorisJsonSerializer implements DorisISerializer { + + private static final long serialVersionUID = 1L; + + private final List fieldNames; + + public DorisJsonSerializer(List fieldNames) { + this.fieldNames = fieldNames; + } + + @Override + public String serialize(Record row) { + if (null == fieldNames) { + return ""; + } + Map rowMap = new HashMap<>(fieldNames.size()); + int idx = 0; + for (String fieldName : fieldNames) { + rowMap.put(fieldName, row.getColumn(idx++).getRawData()); + } + return JSON.toJSONString(rowMap); + } + +} diff --git a/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisSerializerFactory.java b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisSerializerFactory.java new file mode 100644 index 00000000..0816399e --- /dev/null +++ b/doriswriter/src/main/java/com/dorisdb/connector/datax/plugin/writer/doriswriter/row/DorisSerializerFactory.java @@ -0,0 +1,22 @@ +package com.dorisdb.connector.datax.plugin.writer.doriswriter.row; + +import java.util.Map; + +import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions; + +public class DorisSerializerFactory { + + private DorisSerializerFactory() {} + + public static DorisISerializer createSerializer(DorisWriterOptions writerOptions) { + if (DorisWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) { + Map props = writerOptions.getLoadProps(); + return new DorisCsvSerializer(null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator"))); + } + if (DorisWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) { + return new DorisJsonSerializer(writerOptions.getColumns()); + } + throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties."); + } + +}