add support: loadProps

This commit is contained in:
fariel 2021-03-15 14:33:04 +08:00
parent 51b42804be
commit 8ba8e4973b
7 changed files with 186 additions and 10 deletions

View File

@ -9,6 +9,8 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.dorisdb.connector.datax.plugin.writer.doriswriter.manager.DorisWriterManager;
import com.dorisdb.connector.datax.plugin.writer.doriswriter.row.DorisISerializer;
import com.dorisdb.connector.datax.plugin.writer.doriswriter.row.DorisSerializerFactory;
import com.dorisdb.connector.datax.plugin.writer.doriswriter.util.DorisWriterUtil;
import org.slf4j.Logger;
@ -86,11 +88,13 @@ public class DorisWriter extends Writer {
public static class Task extends Writer.Task {
private DorisWriterManager writerManager;
private DorisWriterOptions options;
private DorisISerializer rowSerializer;
@Override
public void init() {
options = new DorisWriterOptions(super.getPluginJobConf());
writerManager = new DorisWriterManager(options);
rowSerializer = DorisSerializerFactory.createSerializer(options);
}
@Override
@ -110,15 +114,7 @@ public class DorisWriter extends Writer {
record.getColumnNumber(),
options.getColumns().size()));
}
StringBuilder sb = new StringBuilder();
for (int i = 0; i < record.getColumnNumber(); i++) {
Object value = record.getColumn(i).getRawData();
sb.append(null == value ? "\\N" : value);
if (i < record.getColumnNumber() - 1) {
sb.append("\t");
}
}
writerManager.writeRecord(sb.toString());
writerManager.writeRecord(rowSerializer.serialize(record));
}
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);

View File

@ -7,6 +7,7 @@ import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import java.util.List;
import java.util.Map;
public class DorisWriterOptions implements Serializable {
@ -17,6 +18,11 @@ public class DorisWriterOptions implements Serializable {
private static final int BATCH_ROWS = 500000;
private static final long BATCH_BYTES = 100 * MEGA_BYTES_SCALE;
private static final String KEY_LOAD_PROPS_FORMAT = "format";
public enum StreamLoadFormat {
CSV, JSON;
}
private static final String KEY_USERNAME = "username";
private static final String KEY_PASSWORD = "password";
private static final String KEY_DATABASE = "database";
@ -26,6 +32,7 @@ public class DorisWriterOptions implements Serializable {
private static final String KEY_POST_SQL = "postSql";
private static final String KEY_JDBC_URL = "jdbcUrl";
private static final String KEY_LOAD_URL = "loadUrl";
private static final String KEY_LOAD_PROPS = "loadProps";
private final Configuration options;
@ -74,6 +81,10 @@ public class DorisWriterOptions implements Serializable {
return options.getList(KEY_POST_SQL, String.class);
}
public Map<String, Object> getLoadProps() {
return options.getMap(KEY_LOAD_PROPS);
}
public int getMaxRetries() {
return MAX_RETRIES;
}
@ -86,6 +97,18 @@ public class DorisWriterOptions implements Serializable {
return BATCH_BYTES;
}
public StreamLoadFormat getStreamLoadFormat() {
Map<String, Object> loadProps = getLoadProps();
if (null == loadProps) {
return StreamLoadFormat.CSV;
}
if (loadProps.containsKey(KEY_LOAD_PROPS_FORMAT)
&& StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(KEY_LOAD_PROPS_FORMAT)))) {
return StreamLoadFormat.JSON;
}
return StreamLoadFormat.CSV;
}
private void validateStreamLoadUrl() {
List<String> urlList = getLoadUrlList();
for (String host : urlList) {
@ -102,6 +125,7 @@ public class DorisWriterOptions implements Serializable {
KEY_PASSWORD,
KEY_DATABASE,
KEY_TABLE,
KEY_COLUMN,
KEY_LOAD_URL
};
for (String optionKey : requiredOptionKeys) {

View File

@ -91,8 +91,14 @@ public class DorisStreamLoadVisitor {
}
private byte[] joinRows(List<String> rows) {
if (DorisWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
return String.join("\n", rows).getBytes(StandardCharsets.UTF_8);
}
if (DorisWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
return new StringBuilder("[").append(String.join(",", rows)).append("]").toString().getBytes(StandardCharsets.UTF_8);
}
throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
}
@SuppressWarnings("unchecked")
private Map<String, Object> doHttpPut(String loadUrl, String label, byte[] data) throws IOException {
@ -110,6 +116,11 @@ public class DorisStreamLoadVisitor {
if (null != cols && !cols.isEmpty()) {
httpPut.setHeader("columns", String.join(",", cols));
}
if (null != writerOptions.getLoadProps()) {
for (Map.Entry<String, Object> entry : writerOptions.getLoadProps().entrySet()) {
httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
}
}
httpPut.setHeader("Expect", "100-continue");
httpPut.setHeader("label", label);
httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded");

View File

@ -0,0 +1,79 @@
package com.dorisdb.connector.datax.plugin.writer.doriswriter.row;
import java.io.StringWriter;
import com.alibaba.datax.common.element.Record;
import com.google.common.base.Strings;
public class DorisCsvSerializer implements DorisISerializer {
private static final long serialVersionUID = 1L;
private final String HEX_STRING = "0123456789ABCDEF";
private final String columnSeparator;
public DorisCsvSerializer(String sp) {
this.columnSeparator = parseByteSeparator(sp);
}
@Override
public String serialize(Record row) {
StringBuilder sb = new StringBuilder();
for (int i = 0; i < row.getColumnNumber(); i++) {
Object value = row.getColumn(i).getRawData();
sb.append(null == value ? "\\N" : value);
if (i < row.getColumnNumber() - 1) {
sb.append(columnSeparator);
}
}
return sb.toString();
}
private String parseByteSeparator(String sp) {
if (Strings.isNullOrEmpty(sp)) {
// `\t` by default
return "\t";
}
if (!sp.toUpperCase().startsWith("\\X")) {
return sp;
}
String hexStr = sp.substring(2);
// check hex str
if (hexStr.isEmpty()) {
throw new RuntimeException("Failed to parse column_separator: `Hex str is empty`");
}
if (hexStr.length() % 2 != 0) {
throw new RuntimeException("Failed to parse column_separator: `Hex str length error`");
}
for (char hexChar : hexStr.toUpperCase().toCharArray()) {
if (HEX_STRING.indexOf(hexChar) == -1) {
throw new RuntimeException("Failed to parse column_separator: `Hex str format error`");
}
}
// transform to separator
StringWriter writer = new StringWriter();
for (byte b : hexStrToBytes(hexStr)) {
writer.append((char) b);
}
return writer.toString();
}
private byte[] hexStrToBytes(String hexStr) {
String upperHexStr = hexStr.toUpperCase();
int length = upperHexStr.length() / 2;
char[] hexChars = upperHexStr.toCharArray();
byte[] bytes = new byte[length];
for (int i = 0; i < length; i++) {
int pos = i * 2;
bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
}
return bytes;
}
private byte charToByte(char c) {
return (byte) HEX_STRING.indexOf(c);
}
}

View File

@ -0,0 +1,11 @@
package com.dorisdb.connector.datax.plugin.writer.doriswriter.row;
import java.io.Serializable;
import com.alibaba.datax.common.element.Record;
public interface DorisISerializer extends Serializable {
String serialize(Record row);
}

View File

@ -0,0 +1,33 @@
package com.dorisdb.connector.datax.plugin.writer.doriswriter.row;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import com.alibaba.datax.common.element.Record;
import com.alibaba.fastjson.JSON;
public class DorisJsonSerializer implements DorisISerializer {
private static final long serialVersionUID = 1L;
private final List<String> fieldNames;
public DorisJsonSerializer(List<String> fieldNames) {
this.fieldNames = fieldNames;
}
@Override
public String serialize(Record row) {
if (null == fieldNames) {
return "";
}
Map<String, Object> rowMap = new HashMap<>(fieldNames.size());
int idx = 0;
for (String fieldName : fieldNames) {
rowMap.put(fieldName, row.getColumn(idx++).getRawData());
}
return JSON.toJSONString(rowMap);
}
}

View File

@ -0,0 +1,22 @@
package com.dorisdb.connector.datax.plugin.writer.doriswriter.row;
import java.util.Map;
import com.dorisdb.connector.datax.plugin.writer.doriswriter.DorisWriterOptions;
public class DorisSerializerFactory {
private DorisSerializerFactory() {}
public static DorisISerializer createSerializer(DorisWriterOptions writerOptions) {
if (DorisWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
Map<String, Object> props = writerOptions.getLoadProps();
return new DorisCsvSerializer(null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator")));
}
if (DorisWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
return new DorisJsonSerializer(writerOptions.getColumns());
}
throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties.");
}
}