diff --git a/doriswriter/doc/doriswriter.md b/doriswriter/doc/doriswriter.md index 344cd943..9f8f510c 100644 --- a/doriswriter/doc/doriswriter.md +++ b/doriswriter/doc/doriswriter.md @@ -20,34 +20,40 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter "reader": { "name": "mysqlreader", "parameter": { - "column": ["k1", "k2", "k3"], + "column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"], "connection": [ { - "jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/db1"], - "table": ["t1"] + "jdbcUrl": ["jdbc:mysql://localhost:3306/demo"], + "table": ["employees_1"] } ], "username": "root", - "password": "", + "password": "xxxxx", "where": "" } }, "writer": { "name": "doriswriter", "parameter": { - "loadUrl": ["127.0.0.1:8030"], - "loadProps": {}, - "database": "db1", - "column": ["k1", "k2", "k3"], + "loadUrl": ["172.16.0.13:8030"], + "loadProps": { + }, + "column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"], "username": "root", - "password": "", - "postSql": [], + "password": "xxxxxx", + "postSql": ["select count(1) from all_employees_info"], "preSql": [], "connection": [ - "jdbcUrl":"jdbc:mysql://127.0.0.1:9030/demo", - "table":["xxx"], - "selectedDatabase":"xxxx" - ] + { + "jdbcUrl": "jdbc:mysql://172.16.0.13:9030/demo", + "database": "demo", + "table": ["all_employees_info"] + } + ], + "loadProps": { + "format": "json", + "strip_outer_array": true + } } } } @@ -159,3 +165,22 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter - 描述:StreamLoad单次请求的超时时间, 单位毫秒(ms)。 - 必选:否 - 默认值:-1 + +### 类型转换 + +默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行StreamLoad导入操作。 +如需更改列分隔符, 则正确配置 `loadProps` 即可: +```json +"loadProps": { + "column_separator": "\\x01", + "row_delimiter": "\\x02" +} +``` + +如需更改导入格式为`json`, 则正确配置 `loadProps` 即可: +```json +"loadProps": { + "format": "json", + "strip_outer_array": true +} +``` \ No newline at end of file diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisBaseSerializer.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisBaseSerializer.java index 6179ec71..c9aacc6e 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisBaseSerializer.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisBaseSerializer.java @@ -1,2 +1,23 @@ -package com.alibaba.datax.plugin.writer.doriswriter;public class DorisBaseSerializer { +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.element.Column; + +public class DorisBaseSerializer { + protected String fieldConvertion( Column col) { + if (null == col.getRawData() || Column.Type.NULL == col.getType()) { + return null; + } + if ( Column.Type.BOOL == col.getType()) { + return String.valueOf(col.asLong()); + } + if ( Column.Type.BYTES == col.getType()) { + byte[] bts = (byte[])col.getRawData(); + long value = 0; + for (int i = 0; i < bts.length; i++) { + value += (bts[bts.length - i - 1] & 0xffL) << (8 * i); + } + return String.valueOf(value); + } + return col.asString(); + } } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java deleted file mode 100644 index ef5889de..00000000 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java +++ /dev/null @@ -1,73 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.alibaba.datax.plugin.writer.doriswriter; - -import com.alibaba.datax.common.element.Column; -import com.alibaba.datax.common.element.DateColumn; -import com.alibaba.datax.common.element.Record; -import org.apache.commons.lang3.time.DateFormatUtils; - -import java.time.ZoneId; -import java.util.List; - -public abstract class DorisCodec { - protected final List fieldNames; - - public DorisCodec(final List fieldNames) { - this.fieldNames = fieldNames; - } - - public abstract String serialize(Record row); - - /** - * convert datax internal data to string - * - * @param col - * @return - */ - protected Object convertColumn(final Column col) { - if (null == col.getRawData()) { - return null; - } - Column.Type type = col.getType(); - switch (type) { - case BOOL: - case INT: - case LONG: - return col.asLong(); - case DOUBLE: - return col.asDouble(); - case STRING: - return col.asString(); - case DATE: { - final DateColumn.DateType dateType = ((DateColumn) col).getSubType(); - switch (dateType) { - case DATE: - return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd"); - case DATETIME: - return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss"); - default: - return col.asString(); - } - } - default: - // BAD, NULL, BYTES - return null; - } - } -} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java deleted file mode 100644 index 8fa8b2c4..00000000 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java +++ /dev/null @@ -1,49 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.alibaba.datax.plugin.writer.doriswriter; - -import com.alibaba.datax.common.element.Record; - -import java.util.ArrayList; -import java.util.List; - -public class DorisCsvCodec extends DorisCodec { - - private final String columnSeparator; - - public DorisCsvCodec(final List fieldNames, String columnSeparator) { - super(fieldNames); - this.columnSeparator = EscapeHandler.escapeString(columnSeparator); - } - - @Override - public String serialize(final Record row) { - if (null == this.fieldNames) { - return ""; - } - List list = new ArrayList<>(); - - for (int i = 0; i < this.fieldNames.size(); i++) { - Object value = this.convertColumn(row.getColumn(i)); - list.add(value != null ? value.toString() : "\\N"); - } - - return String.join(columnSeparator, list); - } - -} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvSerializer.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvSerializer.java index 298d1142..93c477d8 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvSerializer.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvSerializer.java @@ -1,2 +1,26 @@ -package com.alibaba.datax.plugin.writer.doriswriter;public class DorisCsvSerializer { +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.element.Record; + +public class DorisCsvSerializer extends DorisBaseSerializer implements DorisSerializer{ + private static final long serialVersionUID = 1L; + + private final String columnSeparator; + + public DorisCsvSerializer(String sp) { + this.columnSeparator = DorisDelimiterParser.parse(sp, "\t"); + } + + @Override + public String serialize( Record row) { + StringBuilder sb = new StringBuilder(); + for (int i = 0; i < row.getColumnNumber(); i++) { + String value = fieldConvertion(row.getColumn(i)); + sb.append(null == value ? "\\N" : value); + if (i < row.getColumnNumber() - 1) { + sb.append(columnSeparator); + } + } + return sb.toString(); + } } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisDelimiterParser.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisDelimiterParser.java index e3f4b46c..5c92af9f 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisDelimiterParser.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisDelimiterParser.java @@ -1,2 +1,54 @@ -package com.alibaba.datax.plugin.writer.doriswriter;public class DorisDelimiterParser { +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.google.common.base.Strings; + +import java.io.StringWriter; + +public class DorisDelimiterParser { + + private static final String HEX_STRING = "0123456789ABCDEF"; + + public static String parse(String sp, String dSp) throws RuntimeException { + if ( Strings.isNullOrEmpty(sp)) { + return dSp; + } + if (!sp.toUpperCase().startsWith("\\X")) { + return sp; + } + String hexStr = sp.substring(2); + // check hex str + if (hexStr.isEmpty()) { + throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`"); + } + if (hexStr.length() % 2 != 0) { + throw new RuntimeException("Failed to parse delimiter: `Hex str length error`"); + } + for (char hexChar : hexStr.toUpperCase().toCharArray()) { + if (HEX_STRING.indexOf(hexChar) == -1) { + throw new RuntimeException("Failed to parse delimiter: `Hex str format error`"); + } + } + // transform to separator + StringWriter writer = new StringWriter(); + for (byte b : hexStrToBytes(hexStr)) { + writer.append((char) b); + } + return writer.toString(); + } + + private static byte[] hexStrToBytes(String hexStr) { + String upperHexStr = hexStr.toUpperCase(); + int length = upperHexStr.length() / 2; + char[] hexChars = upperHexStr.toCharArray(); + byte[] bytes = new byte[length]; + for (int i = 0; i < length; i++) { + int pos = i * 2; + bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1])); + } + return bytes; + } + + private static byte charToByte(char c) { + return (byte) HEX_STRING.indexOf(c); + } } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java deleted file mode 100644 index 9980c937..00000000 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java +++ /dev/null @@ -1,66 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.alibaba.datax.plugin.writer.doriswriter; - -import java.util.ArrayList; -import java.util.List; - -// A wrapper class to hold a batch of loaded rows -public class DorisFlushBatch { - private final String format; - private final String lineDelimiter; - private String label; - private long byteSize = 0; - private List data = new ArrayList<>(); - - public DorisFlushBatch(String lineDelimiter, String format) { - this.lineDelimiter = EscapeHandler.escapeString(lineDelimiter); - this.format = format; - } - - public void setLabel(String label) { - this.label = label; - } - - public String getLabel() { - return label; - } - - public long getRows() { - return data.size(); - } - - public void putData(String row) { - data.add(row); - byteSize += row.getBytes().length; - } - - public String getData() { - String result; - if (Key.DEFAULT_FORMAT_CSV.equalsIgnoreCase(format)) { - result = String.join(this.lineDelimiter, data); - } else { - result = data.toString(); - } - return result; - } - - public long getSize() { - return byteSize; - } -} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java deleted file mode 100644 index 5486dbde..00000000 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java +++ /dev/null @@ -1,51 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.alibaba.datax.plugin.writer.doriswriter; - -import com.alibaba.datax.common.element.Record; -import com.alibaba.fastjson.JSON; -import com.alibaba.fastjson.serializer.SerializerFeature; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -// Convert DataX data to json -public class DorisJsonCodec extends DorisCodec { - private Map rowMap; - - public DorisJsonCodec(final List fieldNames) { - super(fieldNames); - this.rowMap = new HashMap<>(this.fieldNames.size()); - } - - @Override - public String serialize(final Record row) { - if (null == this.fieldNames) { - return ""; - } - - rowMap.clear(); - int idx = 0; - for (final String fieldName : this.fieldNames) { - rowMap.put(fieldName, this.convertColumn(row.getColumn(idx))); - ++idx; - } - return JSON.toJSONString(rowMap, SerializerFeature.WriteMapNullValue); - } -} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonSerializer.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonSerializer.java index e747c996..315e472c 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonSerializer.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonSerializer.java @@ -1,2 +1,33 @@ -package com.alibaba.datax.plugin.writer.doriswriter;public class DorisJsonSerializer { +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.element.Record; +import com.alibaba.fastjson.JSON; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class DorisJsonSerializer extends DorisBaseSerializer implements DorisSerializer{ + + private static final long serialVersionUID = 1L; + + private final List fieldNames; + + public DorisJsonSerializer( List fieldNames) { + this.fieldNames = fieldNames; + } + + @Override + public String serialize( Record row) { + if (null == fieldNames) { + return ""; + } + Map rowMap = new HashMap<> (fieldNames.size()); + int idx = 0; + for (String fieldName : fieldNames) { + rowMap.put(fieldName, fieldConvertion(row.getColumn(idx))); + idx++; + } + return JSON.toJSONString(rowMap); + } } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisSerializer.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisSerializer.java index a974c5e5..58759ecf 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisSerializer.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisSerializer.java @@ -1,2 +1,10 @@ -package com.alibaba.datax.plugin.writer.doriswriter;public class DorisSerializer { +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.element.Record; + +import java.io.Serializable; + +public interface DorisSerializer extends Serializable { + + String serialize( Record row); } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisSerializerFactory.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisSerializerFactory.java index 2bb1a1c1..8484e5b7 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisSerializerFactory.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisSerializerFactory.java @@ -1,2 +1,19 @@ -package com.alibaba.datax.plugin.writer.doriswriter;public class DorisSerializerFactory { +package com.alibaba.datax.plugin.writer.doriswriter; + +import java.util.Map; + +public class DorisSerializerFactory { + public DorisSerializerFactory(){ + + } + public static DorisSerializer createSerializer(DorisWriterOptions writerOptions) { + if (DorisWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) { + Map props = writerOptions.getLoadProps(); + return new DorisCsvSerializer(null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator"))); + } + if (DorisWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) { + return new DorisJsonSerializer(writerOptions.getColumns()); + } + throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties."); + } } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadExcetion.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadExcetion.java index c8e27e00..dc19b209 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadExcetion.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadExcetion.java @@ -1,2 +1,29 @@ -package com.alibaba.datax.plugin.writer.doriswriter;public class DorisStreamLoadExcetion { +package com.alibaba.datax.plugin.writer.doriswriter; + +import java.io.IOException; +import java.util.Map; + +public class DorisStreamLoadExcetion extends IOException { + + private final Map response; + private boolean reCreateLabel; + + public DorisStreamLoadExcetion(String message, Map response) { + super(message); + this.response = response; + } + + public DorisStreamLoadExcetion(String message, Map response, boolean reCreateLabel) { + super(message); + this.response = response; + this.reCreateLabel = reCreateLabel; + } + + public Map getFailedResponse() { + return response; + } + + public boolean needReCreateLabel() { + return reCreateLabel; + } } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadVisitor.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadVisitor.java new file mode 100644 index 00000000..a18940cb --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisStreamLoadVisitor.java @@ -0,0 +1,233 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.fastjson.JSON; +import org.apache.commons.codec.binary.Base64; +import org.apache.http.HttpEntity; +import org.apache.http.client.config.RequestConfig; +import org.apache.http.client.methods.CloseableHttpResponse; +import org.apache.http.client.methods.HttpGet; +import org.apache.http.client.methods.HttpPut; +import org.apache.http.entity.ByteArrayEntity; +import org.apache.http.impl.client.CloseableHttpClient; +import org.apache.http.impl.client.DefaultRedirectStrategy; +import org.apache.http.impl.client.HttpClientBuilder; +import org.apache.http.impl.client.HttpClients; +import org.apache.http.util.EntityUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.HttpURLConnection; +import java.net.URL; +import java.nio.ByteBuffer; +import java.nio.charset.StandardCharsets; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +public class DorisStreamLoadVisitor { + private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadVisitor.class); + + private DorisWriterOptions options; + + private long pos; + private static final String RESULT_FAILED = "Fail"; + private static final String RESULT_LABEL_EXISTED = "Label Already Exists"; + private static final String LAEBL_STATE_VISIBLE = "VISIBLE"; + private static final String LAEBL_STATE_COMMITTED = "COMMITTED"; + private static final String RESULT_LABEL_PREPARE = "PREPARE"; + private static final String RESULT_LABEL_ABORTED = "ABORTED"; + private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN"; + + + public DorisStreamLoadVisitor(DorisWriterOptions options){ + this.options = options; + } + + public void streamLoad(DorisWriterTuple data) throws Exception { + String host = getAvailableHost(); + if(host == null){ + throw new IOException ("None of the host in `load_url` could be connected."); + } + String loadUrl = new StringBuilder(host) + .append("/api/") + .append(options.getDatabase()) + .append("/") + .append(options.getTable()) + .append("/_stream_load") + .toString(); + LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", data.getRows().size(), data.getBytes(), data.getLabel())); + Map loadResult = doHttpPut(loadUrl, data.getLabel(), joinRows(data.getRows(), data.getBytes().intValue())); + final String keyStatus = "Status"; + if (null == loadResult || !loadResult.containsKey(keyStatus)) { + throw new IOException("Unable to flush data to Doris: unknown result status."); + } + LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString()); + if (RESULT_FAILED.equals(loadResult.get(keyStatus))) { + throw new IOException( + new StringBuilder("Failed to flush data to Doris.\n").append(JSON.toJSONString(loadResult)).toString() + ); + } else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) { + LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString()); + // has to block-checking the state to get the final result + checkLabelState(host, data.getLabel()); + } + } + + private void checkLabelState(String host, String label) throws IOException { + int idx = 0; + while(true) { + try { + TimeUnit.SECONDS.sleep(Math.min(++idx, 5)); + } catch (InterruptedException ex) { + break; + } + try (CloseableHttpClient httpclient = HttpClients.createDefault()) { + HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString()); + httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword())); + httpGet.setHeader("Connection", "close"); + + try (CloseableHttpResponse resp = httpclient.execute(httpGet)) { + HttpEntity respEntity = getHttpEntity(resp); + if (respEntity == null) { + throw new IOException(String.format("Failed to flush data to Doris, Error " + + "could not get the final state of label[%s].\n", label), null); + } + Map result = (Map)JSON.parse(EntityUtils.toString(respEntity)); + String labelState = (String)result.get("state"); + if (null == labelState) { + throw new IOException(String.format("Failed to flush data to Doris, Error " + + "could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null); + } + LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState)); + switch(labelState) { + case LAEBL_STATE_VISIBLE: + case LAEBL_STATE_COMMITTED: + return; + case RESULT_LABEL_PREPARE: + continue; + case RESULT_LABEL_ABORTED: + throw new DorisStreamLoadExcetion(String.format("Failed to flush data to Doris, Error " + + "label[%s] state[%s]\n", label, labelState), null, true); + case RESULT_LABEL_UNKNOWN: + default: + throw new IOException(String.format("Failed to flush data to Doris, Error " + + "label[%s] state[%s]\n", label, labelState), null); + } + } + } + } + } + + private byte[] joinRows(List rows, int totalBytes) { + if (DorisWriterOptions.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { + Map props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps()); + byte[] lineDelimiter = DorisDelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8); + ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length); + for (byte[] row : rows) { + bos.put(row); + bos.put(lineDelimiter); + } + return bos.array(); + } + + if (DorisWriterOptions.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) { + ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1)); + bos.put("[".getBytes(StandardCharsets.UTF_8)); + byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); + boolean isFirstElement = true; + for (byte[] row : rows) { + if (!isFirstElement) { + bos.put(jsonDelimiter); + } + bos.put(row); + isFirstElement = false; + } + bos.put("]".getBytes(StandardCharsets.UTF_8)); + return bos.array(); + } + throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:"); + } + private Map doHttpPut(String loadUrl, String label, byte[] data) throws IOException { + LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length)); + final HttpClientBuilder httpClientBuilder = HttpClients.custom() + .setRedirectStrategy(new DefaultRedirectStrategy () { + @Override + protected boolean isRedirectable(String method) { + return true; + } + }); + try ( CloseableHttpClient httpclient = httpClientBuilder.build()) { + HttpPut httpPut = new HttpPut(loadUrl); + List cols = options.getColumns(); + if (null != cols && !cols.isEmpty() && DorisWriterOptions.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) { + httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList()))); + } + if (null != options.getLoadProps()) { + for (Map.Entry entry : options.getLoadProps().entrySet()) { + httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); + } + } + httpPut.setHeader("Expect", "100-continue"); + httpPut.setHeader("label", label); + httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded"); + httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword())); + httpPut.setEntity(new ByteArrayEntity (data)); + httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); + try ( CloseableHttpResponse resp = httpclient.execute(httpPut)) { + HttpEntity respEntity = getHttpEntity(resp); + if (respEntity == null) + return null; + return (Map)JSON.parse(EntityUtils.toString(respEntity)); + } + } + } + + private String getBasicAuthHeader(String username, String password) { + String auth = username + ":" + password; + byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8)); + return new StringBuilder("Basic ").append(new String(encodedAuth)).toString(); + } + + private HttpEntity getHttpEntity(CloseableHttpResponse resp) { + int code = resp.getStatusLine().getStatusCode(); + if (200 != code) { + LOG.warn("Request failed with code:{}", code); + return null; + } + HttpEntity respEntity = resp.getEntity(); + if (null == respEntity) { + LOG.warn("Request failed with empty response."); + return null; + } + return respEntity; + } + + private String getAvailableHost() { + List hostList = options.getLoadUrlList(); + long tmp = pos + hostList.size(); + for (; pos < tmp; pos++) { + String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString(); + if (tryHttpConnection(host)) { + return host; + } + } + return null; + } + + private boolean tryHttpConnection(String host) { + try { + URL url = new URL(host); + HttpURLConnection co = (HttpURLConnection) url.openConnection(); + co.setConnectTimeout(5000); + co.connect(); + co.disconnect(); + return true; + } catch (Exception e1) { + LOG.warn("Failed to connect to address:{}", host, e1); + return false; + } + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java new file mode 100644 index 00000000..7c7d9a92 --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisUtil.java @@ -0,0 +1,105 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.plugin.rdbms.util.DBUtil; +import com.alibaba.datax.plugin.rdbms.util.DataBaseType; +import com.alibaba.datax.plugin.rdbms.util.RdbmsException; +import com.alibaba.datax.plugin.rdbms.writer.Constant; +import com.alibaba.druid.sql.parser.ParserException; +import com.google.common.base.Strings; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.Connection; +import java.sql.ResultSet; +import java.sql.Statement; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** + * jdbc util + */ +public class DorisUtil { + private static final Logger LOG = LoggerFactory.getLogger(DorisUtil.class); + + private DorisUtil() {} + + public static List getDorisTableColumns( Connection conn, String databaseName, String tableName) { + String currentSql = String.format("SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '%s' AND `TABLE_NAME` = '%s' ORDER BY `ORDINAL_POSITION` ASC;", databaseName, tableName); + List columns = new ArrayList<> (); + ResultSet rs = null; + try { + rs = DBUtil.query(conn, currentSql); + while (DBUtil.asyncResultSetNext(rs)) { + String colName = rs.getString("COLUMN_NAME"); + columns.add(colName); + } + return columns; + } catch (Exception e) { + throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null); + } finally { + DBUtil.closeDBResources(rs, null, null); + } + } + + public static List renderPreOrPostSqls(List preOrPostSqls, String tableName) { + if (null == preOrPostSqls) { + return Collections.emptyList(); + } + List renderedSqls = new ArrayList<>(); + for (String sql : preOrPostSqls) { + if (! Strings.isNullOrEmpty(sql)) { + renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName)); + } + } + return renderedSqls; + } + + public static void executeSqls(Connection conn, List sqls) { + Statement stmt = null; + String currentSql = null; + try { + stmt = conn.createStatement(); + for (String sql : sqls) { + currentSql = sql; + DBUtil.executeSqlWithoutResultSet(stmt, sql); + } + } catch (Exception e) { + throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null); + } finally { + DBUtil.closeDBResources(null, stmt, null); + } + } + + public static void preCheckPrePareSQL(DorisWriterOptions options) { + String table = options.getTable(); + List preSqls = options.getPreSqlList(); + List renderedPreSqls = DorisUtil.renderPreOrPostSqls(preSqls, table); + if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) { + LOG.info("Begin to preCheck preSqls:[{}].", String.join(";", renderedPreSqls)); + for (String sql : renderedPreSqls) { + try { + DBUtil.sqlValid(sql, DataBaseType.MySql); + } catch ( ParserException e) { + throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql); + } + } + } + } + + public static void preCheckPostSQL(DorisWriterOptions options) { + String table = options.getTable(); + List postSqls = options.getPostSqlList(); + List renderedPostSqls = DorisUtil.renderPreOrPostSqls(postSqls, table); + if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) { + LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls)); + for(String sql : renderedPostSqls) { + try { + DBUtil.sqlValid(sql, DataBaseType.MySql); + } catch (ParserException e){ + throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql); + } + } + } + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java index 5e0bd205..f16cd354 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java @@ -39,131 +39,41 @@ import java.util.Collections; import java.util.List; import java.util.UUID; +/** + * doris data writer + */ public class DorisWriter extends Writer { - public DorisWriter() { - } - public static class Task extends com.alibaba.datax.common.spi.Writer.Task { - private DorisWriterEmitter dorisWriterEmitter; - private Key keys; - private DorisCodec rowCodec; - private int batchNum = 0; - private String labelPrefix; + public static class Job extends Writer.Job { - public Task() { - } - - @Override - public void init() { - this.keys = new Key(super.getPluginJobConf()); - if (Key.DEFAULT_FORMAT_CSV.equalsIgnoreCase(this.keys.getFormat())) { - this.rowCodec = new DorisCsvCodec(this.keys.getColumns(), this.keys.getColumnSeparator()); - } else { - this.rowCodec = new DorisJsonCodec(this.keys.getColumns()); - } - this.labelPrefix = this.keys.getLabelPrefix() + UUID.randomUUID(); - this.dorisWriterEmitter = new DorisWriterEmitter(keys); - } - - @Override - public void prepare() { - } - - @Override - public void startWrite(RecordReceiver recordReceiver) { - String lineDelimiter = this.keys.getLineDelimiter(); - DorisFlushBatch flushBatch = new DorisFlushBatch(lineDelimiter, this.keys.getFormat()); - long batchCount = 0; - long batchByteSize = 0L; - Record record; - // loop to get record from datax - while ((record = recordReceiver.getFromReader()) != null) { - // check column size - if (record.getColumnNumber() != this.keys.getColumns().size()) { - throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, - String.format("config writer column info error. because the column number of reader is :%s" + - "and the column number of writer is:%s. please check you datax job config json.", - record.getColumnNumber(), this.keys.getColumns().size())); - } - // codec record - final String recordStr = this.rowCodec.serialize(record); - - // put into buffer - flushBatch.putData(recordStr); - batchCount += 1; - batchByteSize += recordStr.length(); - // trigger buffer - if (batchCount >= this.keys.getBatchRows() || batchByteSize >= this.keys.getBatchByteSize()) { - // generate doris stream load label - flush (flushBatch); - // clear buffer - batchCount = 0; - batchByteSize = 0L; - flushBatch = new DorisFlushBatch (lineDelimiter, this.keys.getFormat()); - } - } // end of while - - if (flushBatch.getSize() > 0) { - flush(flushBatch); - } - } - - private void flush(DorisFlushBatch flushBatch) { - flushBatch.setLabel(getStreamLoadLabel()); - dorisWriterEmitter.emit(flushBatch); - } - - private String getStreamLoadLabel() { - return labelPrefix + "_" + (batchNum++); - } - - @Override - public void post() { - - } - - @Override - public void destroy() { - } - - @Override - public boolean supportFailOver() { - return false; - } - } - - public static class Job extends com.alibaba.datax.common.spi.Writer.Job { - private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.Job.class); + private static final Logger LOG = LoggerFactory.getLogger(Job.class); private Configuration originalConfig = null; - private Key keys; - - public Job() { - } + private DorisWriterOptions options; @Override public void init() { this.originalConfig = super.getPluginJobConf(); - this.keys = new Key(super.getPluginJobConf()); - this.keys.doPretreatment(); + options = new DorisWriterOptions(super.getPluginJobConf()); + options.doPretreatment(); } @Override - public void preCheck() { + public void preCheck(){ this.init(); - this.preCheckPrePareSQL(this.keys); - this.preCheckPostSQL(this.keys); + DorisUtil.preCheckPrePareSQL(options); + DorisUtil.preCheckPostSQL(options); } @Override public void prepare() { - String username = this.keys.getUsername(); - String password = this.keys.getPassword(); - String jdbcUrl = this.keys.getJdbcUrl(); - List renderedPreSqls = this.renderPreOrPostSqls(this.keys.getPreSqlList(), this.keys.getTable()); - if (!renderedPreSqls.isEmpty()) { + String username = options.getUsername(); + String password = options.getPassword(); + String jdbcUrl = options.getJdbcUrl(); + List renderedPreSqls = DorisUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable()); + if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) { Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password); - LOG.info("prepare execute preSqls:[{}]. doris jdbc url:{}.", String.join(";", renderedPreSqls), jdbcUrl); - this.executeSqls(conn, renderedPreSqls); + LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl); + DorisUtil.executeSqls(conn, renderedPreSqls); DBUtil.closeDBResources(null, null, conn); } } @@ -171,93 +81,89 @@ public class DorisWriter extends Writer { @Override public List split(int mandatoryNumber) { List configurations = new ArrayList<>(mandatoryNumber); - - for (int i = 0; i < mandatoryNumber; ++i) { - configurations.add(this.originalConfig); + for (int i = 0; i < mandatoryNumber; i++) { + configurations.add(originalConfig); } - return configurations; } @Override public void post() { - String username = this.keys.getUsername(); - String password = this.keys.getPassword(); - String jdbcUrl = this.keys.getJdbcUrl(); - List renderedPostSqls = this.renderPreOrPostSqls(this.keys.getPostSqlList(), this.keys.getTable()); - if (!renderedPostSqls.isEmpty()) { + String username = options.getUsername(); + String password = options.getPassword(); + String jdbcUrl = options.getJdbcUrl(); + LOG.info("userName :{},password:{},jdbcUrl:{}.", username,password,jdbcUrl); + List renderedPostSqls = DorisUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable()); + if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) { Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password); - LOG.info("prepare execute postSqls:[{}]. doris jdbc url为:{}.", String.join(";", renderedPostSqls), jdbcUrl); - this.executeSqls(conn, renderedPostSqls); + LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl); + DorisUtil.executeSqls(conn, renderedPostSqls); DBUtil.closeDBResources(null, null, conn); } - } @Override public void destroy() { } - private List renderPreOrPostSqls(final List preOrPostSqls, final String tableName) { - if (null == preOrPostSqls) { - return Collections.emptyList(); + } + + public static class Task extends Writer.Task { + private DorisWriterManager writerManager; + private DorisWriterOptions options; + private DorisSerializer rowSerializer; + + @Override + public void init() { + options = new DorisWriterOptions(super.getPluginJobConf()); + if (options.isWildcardColumn()) { + Connection conn = DBUtil.getConnection(DataBaseType.MySql, options.getJdbcUrl(), options.getUsername(), options.getPassword()); + List columns = DorisUtil.getDorisTableColumns(conn, options.getDatabase(), options.getTable()); + options.setInfoCchemaColumns(columns); } - final List renderedSqls = new ArrayList<>(); - for (final String sql : preOrPostSqls) { - if (!Strings.isNullOrEmpty(sql)) { - renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName)); - } - } - return renderedSqls; + writerManager = new DorisWriterManager(options); + rowSerializer = DorisSerializerFactory.createSerializer(options); } - private void executeSqls(final Connection conn, final List sqls) { - Statement stmt = null; - String currentSql = null; + @Override + public void prepare() { + } + + public void startWrite(RecordReceiver recordReceiver) { try { - stmt = conn.createStatement(); - for (String s : sqls) { - final String sql = currentSql = s; - DBUtil.executeSqlWithoutResultSet(stmt, sql); + Record record; + while ((record = recordReceiver.getFromReader()) != null) { + if (record.getColumnNumber() != options.getColumns().size()) { + throw DataXException + .asDataXException( + DBUtilErrorCode.CONF_ERROR, + String.format( + "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.", + record.getColumnNumber(), + options.getColumns().size())); + } + writerManager.writeRecord(rowSerializer.serialize(record)); } } catch (Exception e) { - throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null); - } finally { - DBUtil.closeDBResources(null, stmt, null); + throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); } } - private void preCheckPrePareSQL(final Key keys) { - final String table = keys.getTable(); - final List preSqls = keys.getPreSqlList(); - final List renderedPreSqls = renderPreOrPostSqls(preSqls, table); - if (!renderedPreSqls.isEmpty()) { - LOG.info("prepare check preSqls:[{}].", String.join(";", renderedPreSqls)); - for (final String sql : renderedPreSqls) { - try { - DBUtil.sqlValid(sql, DataBaseType.MySql); - } catch (ParserException e) { - throw RdbmsException.asPreSQLParserException(DataBaseType.MySql, e, sql); - } - } + @Override + public void post() { + try { + writerManager.close(); + } catch (Exception e) { + throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); } } - private void preCheckPostSQL(final Key keys) { - final String table = keys.getTable(); - final List postSqls = keys.getPostSqlList(); - final List renderedPostSqls = renderPreOrPostSqls(postSqls, table); - if (!renderedPostSqls.isEmpty()) { - LOG.info("prepare check postSqls:[{}].", String.join(";", renderedPostSqls)); - for (final String sql : renderedPostSqls) { - try { - DBUtil.sqlValid(sql, DataBaseType.MySql); - } catch (ParserException e) { - throw RdbmsException.asPostSQLParserException(DataBaseType.MySql, e, sql); - } - } - } - } + @Override + public void destroy() {} + @Override + public boolean supportFailOver(){ + return false; + } } } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java deleted file mode 100644 index ba5d6742..00000000 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java +++ /dev/null @@ -1,243 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.alibaba.datax.plugin.writer.doriswriter; - -import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; -import com.alibaba.fastjson.JSON; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import org.apache.commons.lang3.StringUtils; -import org.apache.http.HttpEntity; -import org.apache.http.HttpHeaders; -import org.apache.http.HttpRequest; -import org.apache.http.HttpResponse; -import org.apache.http.HttpStatus; -import org.apache.http.ProtocolException; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpGet; -import org.apache.http.client.methods.HttpHead; -import org.apache.http.client.methods.HttpPut; -import org.apache.http.client.methods.HttpUriRequest; -import org.apache.http.client.methods.RequestBuilder; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.DefaultRedirectStrategy; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.protocol.HttpContext; -import org.apache.http.util.EntityUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.net.URI; -import java.util.Base64; -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; -import java.net.URLEncoder; - -// Used to load batch of rows to Doris using stream load -public class DorisWriterEmitter { - private static final Logger LOG = LoggerFactory.getLogger(DorisWriterEmitter.class); - private final Key keys; - private int hostPos = 0; - private List targetHosts = Lists.newArrayList(); - - private RequestConfig requestConfig; - - public DorisWriterEmitter(final Key keys) { - this.keys = keys; - initHostList(); - initRequestConfig(); - } - - private void initRequestConfig() { - requestConfig = RequestConfig.custom().setConnectTimeout(this.keys.getConnectTimeout()).build(); - } - - // get target host from config - private void initHostList() { - List hosts = this.keys.getBeLoadUrlList(); - if (hosts == null || hosts.isEmpty()) { - hosts = this.keys.getFeLoadUrlList(); - } - if (hosts == null || hosts.isEmpty()) { - DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, - "Either beLoadUrl or feLoadUrl must be set"); - } - for (String beHost : hosts) { - targetHosts.add("http://" + beHost); - } - } - - public void emit(final DorisFlushBatch flushData) { - String host = this.getAvailableHost(); - for (int i = 0; i <= this.keys.getMaxRetries(); i++) { - try { - doStreamLoad(flushData, host); - return; - } catch (DataXException ex) { - if (i >= this.keys.getMaxRetries()) { - throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, ex); - } - LOG.error("StreamLoad error, switch host {} and retry: ", host, ex); - host = this.getAvailableHost(); - } - } - } - - /** - * execute doris stream load - */ - private void doStreamLoad(final DorisFlushBatch flushData, String host) { - long start = System.currentTimeMillis(); - if (StringUtils.isEmpty(host)) { - throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "None of the load url can be connected."); - } - String loadUrl = host + "/api/" + this.keys.getDatabase() + "/" + this.keys.getTable() + "/_stream_load"; - // do http put request and get response - final Map loadResult; - try { - loadResult = this.doHttpPut(loadUrl, flushData); - } catch (IOException e) { - throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); - } - - long cost = System.currentTimeMillis() - start; - LOG.info("StreamLoad response: " + JSON.toJSONString(loadResult) + ", cost(ms): " + cost); - final String keyStatus = "Status"; - if (null == loadResult || !loadResult.containsKey(keyStatus)) { - throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "Unable to flush data to doris: unknown result status."); - } - if (loadResult.get(keyStatus).equals("Fail")) { - throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "Failed to flush data to doris.\n" + JSON.toJSONString(loadResult)); - } - } - - /** - * loop to get target host - * - * @return - */ - private String getAvailableHost() { - if (this.hostPos >= targetHosts.size()) { - this.hostPos = 0; - } - - while (this.hostPos < targetHosts.size()) { - final String host = targetHosts.get(hostPos); - ++this.hostPos; - return host; - } - - return null; - } - - private Map doHttpPut(final String loadUrl, final DorisFlushBatch flushBatch) throws IOException { - LOG.info(String.format("Executing stream load to: '%s', size: %s, rows: %d", - loadUrl, flushBatch.getSize(), flushBatch.getRows())); - - final HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() { - @Override - protected boolean isRedirectable(final String method) { - return true; - } - - @Override - public HttpUriRequest getRedirect(HttpRequest request, HttpResponse response, HttpContext context) throws ProtocolException { - URI uri = this.getLocationURI(request, response, context); - String method = request.getRequestLine().getMethod(); - if (method.equalsIgnoreCase("HEAD")) { - return new HttpHead(uri); - } else if (method.equalsIgnoreCase("GET")) { - return new HttpGet(uri); - } else { - int status = response.getStatusLine().getStatusCode(); - return (HttpUriRequest) (status == 307 ? RequestBuilder.copy(request).setUri(uri).build() : new HttpGet(uri)); - } - } - }); - - try (final CloseableHttpClient httpclient = httpClientBuilder.build()) { - final HttpPut httpPut = new HttpPut(loadUrl); - final List cols = this.keys.getColumns(); - if (null != cols && !cols.isEmpty()) { - httpPut.setHeader("columns", String.join(",", cols.stream().map(item -> String.format("`%s`", item.trim().replace("`", ""))).collect(Collectors.toList()))); - } - - //set default header - setDefaultHeader(httpPut); - // put custom loadProps to http header - final Map loadProps = this.keys.getLoadProps(); - if (null != loadProps) { - for (final Map.Entry entry : loadProps.entrySet()) { - httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); - } - } - - // set other required headers - httpPut.setHeader(HttpHeaders.EXPECT, "100-continue"); - httpPut.setHeader(HttpHeaders.AUTHORIZATION, this.getBasicAuthHeader(this.keys.getUsername(), this.keys.getPassword())); - httpPut.setHeader("label", flushBatch.getLabel()); - - // Use ByteArrayEntity instead of StringEntity to handle Chinese correctly - httpPut.setEntity(new ByteArrayEntity(flushBatch.getData().getBytes())); - httpPut.setConfig(requestConfig); - - try (final CloseableHttpResponse resp = httpclient.execute(httpPut)) { - final int code = resp.getStatusLine().getStatusCode(); - if (HttpStatus.SC_OK != code) { - LOG.warn("Request failed with code:{}", code); - return null; - } - final HttpEntity respEntity = resp.getEntity(); - if (null == respEntity) { - LOG.warn("Request failed with empty response."); - return null; - } - return (Map) JSON.parse(EntityUtils.toString(respEntity)); - } - } - } - - /** - * Set default request headers in json and csv formats. - * csv default delimiters are \x01 and \x02 - */ - private void setDefaultHeader(HttpPut httpPut) { - if (Key.DEFAULT_FORMAT_CSV.equalsIgnoreCase(this.keys.getFormat())) { - httpPut.setHeader("line_delimiter", this.keys.getLineDelimiter()); - httpPut.setHeader("column_separator", this.keys.getColumnSeparator()); - } else { - httpPut.setHeader("format", "json"); - httpPut.setHeader("strip_outer_array", "true"); - httpPut.setHeader("fuzzy_parse", "true"); - } - } - - private String getBasicAuthHeader(final String username, final String password) { - final String auth = username + ":" + password; - final byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes()); - return "Basic " + new String(encodedAuth); - } - -} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java new file mode 100644 index 00000000..df034076 --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterManager.java @@ -0,0 +1,192 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.google.common.base.Strings; +import org.apache.commons.lang3.concurrent.BasicThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.nio.charset.StandardCharsets; +import java.util.ArrayList; +import java.util.List; +import java.util.UUID; +import java.util.concurrent.Executors; +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +public class DorisWriterManager { + + private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class); + + private final DorisStreamLoadVisitor visitor; + private final DorisWriterOptions options; + private final List buffer = new ArrayList<> (); + private int batchCount = 0; + private long batchSize = 0; + private volatile boolean closed = false; + private volatile Exception flushException; + private final LinkedBlockingDeque flushQueue; + private ScheduledExecutorService scheduler; + private ScheduledFuture scheduledFuture; + + public DorisWriterManager(DorisWriterOptions options) { + this.options = options; + this.visitor = new DorisStreamLoadVisitor(options); + flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength()); + this.startScheduler(); + this.startAsyncFlushing(); + } + + public void startScheduler() { + stopScheduler(); + this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("Doris-interval-flush").daemon(true).build()); + this.scheduledFuture = this.scheduler.schedule(() -> { + synchronized (DorisWriterManager.this) { + if (!closed) { + try { + String label = createBatchLabel(); + LOG.info(String.format("Doris interval Sinking triggered: label[%s].", label)); + if (batchCount == 0) { + startScheduler(); + } + flush(label, false); + } catch (Exception e) { + flushException = e; + } + } + } + }, options.getFlushInterval(), TimeUnit.MILLISECONDS); + } + + public void stopScheduler() { + if (this.scheduledFuture != null) { + scheduledFuture.cancel(false); + this.scheduler.shutdown(); + } + } + + public final synchronized void writeRecord(String record) throws IOException { + checkFlushException(); + try { + byte[] bts = record.getBytes(StandardCharsets.UTF_8); + buffer.add(bts); + batchCount++; + batchSize += bts.length; + if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) { + String label = createBatchLabel(); + LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label)); + flush(label, false); + } + } catch (Exception e) { + throw new IOException("Writing records to Doris failed.", e); + } + } + + public synchronized void flush(String label, boolean waitUtilDone) throws Exception { + checkFlushException(); + if (batchCount == 0) { + if (waitUtilDone) { + waitAsyncFlushingDone(); + } + return; + } + flushQueue.put(new DorisWriterTuple(label, batchSize, new ArrayList<>(buffer))); + if (waitUtilDone) { + // wait the last flush + waitAsyncFlushingDone(); + } + buffer.clear(); + batchCount = 0; + batchSize = 0; + } + + public synchronized void close() { + if (!closed) { + closed = true; + try { + String label = createBatchLabel(); + if (batchCount > 0) LOG.debug(String.format("Doris Sink is about to close: label[%s].", label)); + flush(label, true); + } catch (Exception e) { + throw new RuntimeException("Writing records to Doris failed.", e); + } + } + checkFlushException(); + } + + public String createBatchLabel() { + StringBuilder sb = new StringBuilder(); + if (! Strings.isNullOrEmpty(options.getLabelPrefix())) { + sb.append(options.getLabelPrefix()); + } + return sb.append(UUID.randomUUID().toString()) + .toString(); + } + + private void startAsyncFlushing() { + // start flush thread + Thread flushThread = new Thread(new Runnable(){ + public void run() { + while(true) { + try { + asyncFlush(); + } catch (Exception e) { + flushException = e; + } + } + } + }); + flushThread.setDaemon(true); + flushThread.start(); + } + + private void waitAsyncFlushingDone() throws InterruptedException { + // wait previous flushings + for (int i = 0; i <= options.getFlushQueueLength(); i++) { + flushQueue.put(new DorisWriterTuple("", 0l, null)); + } + checkFlushException(); + } + + private void asyncFlush() throws Exception { + DorisWriterTuple flushData = flushQueue.take(); + if (Strings.isNullOrEmpty(flushData.getLabel())) { + return; + } + stopScheduler(); + LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); + for (int i = 0; i <= options.getMaxRetries(); i++) { + try { + // flush to Doris with stream load + visitor.streamLoad(flushData); + LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel())); + startScheduler(); + break; + } catch (Exception e) { + LOG.warn("Failed to flush batch data to Doris, retry times = {}", i, e); + if (i >= options.getMaxRetries()) { + throw new IOException(e); + } + if (e instanceof DorisStreamLoadExcetion && ((DorisStreamLoadExcetion)e).needReCreateLabel()) { + String newLabel = createBatchLabel(); + LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel)); + flushData.setLabel(newLabel); + } + try { + Thread.sleep(1000l * Math.min(i + 1, 10)); + } catch (InterruptedException ex) { + Thread.currentThread().interrupt(); + throw new IOException("Unable to flush, interrupted while doing another attempt", e); + } + } + } + } + + private void checkFlushException() { + if (flushException != null) { + throw new RuntimeException("Writing records to Doris failed.", flushException); + } + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterOptions.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterOptions.java new file mode 100644 index 00000000..b4395d2e --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterOptions.java @@ -0,0 +1,174 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; + +import java.io.Serializable; +import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; + +public class DorisWriterOptions implements Serializable { + + private static final long serialVersionUID = 1l; + private static final long KILO_BYTES_SCALE = 1024l; + private static final long MEGA_BYTES_SCALE = KILO_BYTES_SCALE * KILO_BYTES_SCALE; + private static final int MAX_RETRIES = 3; + private static final int BATCH_ROWS = 500000; + private static final long BATCH_BYTES = 90 * MEGA_BYTES_SCALE; + private static final long FLUSH_INTERVAL = 300000; + + private static final String KEY_LOAD_PROPS_FORMAT = "format"; + public enum StreamLoadFormat { + CSV, JSON; + } + + private static final String KEY_USERNAME = "username"; + private static final String KEY_PASSWORD = "password"; + private static final String KEY_DATABASE = "connection[0].database"; + private static final String KEY_TABLE = "connection[0].table[0]"; + private static final String KEY_COLUMN = "column"; + private static final String KEY_PRE_SQL = "preSql"; + private static final String KEY_POST_SQL = "postSql"; + private static final String KEY_JDBC_URL = "connection[0].jdbcUrl"; + private static final String KEY_LABEL_PREFIX = "labelPrefix"; + private static final String KEY_MAX_BATCH_ROWS = "maxBatchRows"; + private static final String KEY_MAX_BATCH_SIZE = "maxBatchSize"; + private static final String KEY_FLUSH_INTERVAL = "flushInterval"; + private static final String KEY_LOAD_URL = "loadUrl"; + private static final String KEY_FLUSH_QUEUE_LENGTH = "flushQueueLength"; + private static final String KEY_LOAD_PROPS = "loadProps"; + + private final Configuration options; + private List infoCchemaColumns; + private List userSetColumns; + private boolean isWildcardColumn; + + public DorisWriterOptions(Configuration options) { + this.options = options; + this.userSetColumns = options.getList(KEY_COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList()); + if (1 == options.getList(KEY_COLUMN, String.class).size() && "*".trim().equals(options.getList(KEY_COLUMN, String.class).get(0))) { + this.isWildcardColumn = true; + } + } + + public void doPretreatment() { + validateRequired(); + validateStreamLoadUrl(); + } + + public String getJdbcUrl() { + return options.getString(KEY_JDBC_URL); + } + + public String getDatabase() { + return options.getString(KEY_DATABASE); + } + + public String getTable() { + return options.getString(KEY_TABLE); + } + + public String getUsername() { + return options.getString(KEY_USERNAME); + } + + public String getPassword() { + return options.getString(KEY_PASSWORD); + } + + public String getLabelPrefix() { + return options.getString(KEY_LABEL_PREFIX); + } + + public List getLoadUrlList() { + return options.getList(KEY_LOAD_URL, String.class); + } + + public List getColumns() { + if (isWildcardColumn) { + return this.infoCchemaColumns; + } + return this.userSetColumns; + } + + public boolean isWildcardColumn() { + return this.isWildcardColumn; + } + + public void setInfoCchemaColumns(List cols) { + this.infoCchemaColumns = cols; + } + + public List getPreSqlList() { + return options.getList(KEY_PRE_SQL, String.class); + } + + public List getPostSqlList() { + return options.getList(KEY_POST_SQL, String.class); + } + + public Map getLoadProps() { + return options.getMap(KEY_LOAD_PROPS); + } + + public int getMaxRetries() { + return MAX_RETRIES; + } + + public int getBatchRows() { + Integer rows = options.getInt(KEY_MAX_BATCH_ROWS); + return null == rows ? BATCH_ROWS : rows; + } + + public long getBatchSize() { + Long size = options.getLong(KEY_MAX_BATCH_SIZE); + return null == size ? BATCH_BYTES : size; + } + + public long getFlushInterval() { + Long interval = options.getLong(KEY_FLUSH_INTERVAL); + return null == interval ? FLUSH_INTERVAL : interval; + } + + public int getFlushQueueLength() { + Integer len = options.getInt(KEY_FLUSH_QUEUE_LENGTH); + return null == len ? 1 : len; + } + + public StreamLoadFormat getStreamLoadFormat() { + Map loadProps = getLoadProps(); + if (null == loadProps) { + return StreamLoadFormat.CSV; + } + if (loadProps.containsKey(KEY_LOAD_PROPS_FORMAT) + && StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(KEY_LOAD_PROPS_FORMAT)))) { + return StreamLoadFormat.JSON; + } + return StreamLoadFormat.CSV; + } + + private void validateStreamLoadUrl() { + List urlList = getLoadUrlList(); + for (String host : urlList) { + if (host.split(":").length < 2) { + throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, + "loadUrl的格式不正确,请输入 `fe_ip:fe_http_ip;fe_ip:fe_http_ip`。"); + } + } + } + + private void validateRequired() { + final String[] requiredOptionKeys = new String[]{ + KEY_USERNAME, + KEY_DATABASE, + KEY_TABLE, + KEY_COLUMN, + KEY_LOAD_URL + }; + for (String optionKey : requiredOptionKeys) { + options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE); + } + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterTuple.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterTuple.java new file mode 100644 index 00000000..0bc52daf --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterTuple.java @@ -0,0 +1,20 @@ +package com.alibaba.datax.plugin.writer.doriswriter; + +import java.util.List; + +public class DorisWriterTuple { + private String label; + private Long bytes; + private List rows; + + public DorisWriterTuple(String label,Long bytes,List rows){ + this.label = label; + this.rows = rows; + this.bytes = bytes; + } + + public String getLabel() { return label; } + public void setLabel(String label) { this.label = label; } + public Long getBytes() { return bytes; } + public List getRows() { return rows; } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/EscapeHandler.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/EscapeHandler.java deleted file mode 100644 index 91b0fbdb..00000000 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/EscapeHandler.java +++ /dev/null @@ -1,42 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.alibaba.datax.plugin.writer.doriswriter; - -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -/** - * Handler for escape in properties. - */ -public class EscapeHandler { - public static final String ESCAPE_DELIMITERS_FLAGS = "\\x"; - public static final Pattern ESCAPE_PATTERN = Pattern.compile("\\\\x([0-9|a-f|A-F]{2})"); - - public static String escapeString(String source) { - if (source.startsWith(ESCAPE_DELIMITERS_FLAGS)) { - Matcher m = ESCAPE_PATTERN.matcher(source); - StringBuffer buf = new StringBuffer(); - while (m.find()) { - m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1), 16))); - } - m.appendTail(buf); - return buf.toString(); - } - return source; - } -} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java deleted file mode 100644 index a70f6650..00000000 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java +++ /dev/null @@ -1,188 +0,0 @@ -// Licensed to the Apache Software Foundation (ASF) under one -// or more contributor license agreements. See the NOTICE file -// distributed with this work for additional information -// regarding copyright ownership. The ASF licenses this file -// to you under the Apache License, Version 2.0 (the -// "License"); you may not use this file except in compliance -// with the License. You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, -// software distributed under the License is distributed on an -// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY -// KIND, either express or implied. See the License for the -// specific language governing permissions and limitations -// under the License. - -package com.alibaba.datax.plugin.writer.doriswriter; - -import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; -import com.google.common.base.Strings; - -import java.io.Serializable; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -public class Key implements Serializable { - public static final String FE_LOAD_URL = "loadUrl"; - public static final String BE_LOAD_URL = "beLoadUrl"; - public static final String JDBC_URL = "connection[0].jdbcUrl"; - - public static final String DATABASE = "connection[0].selectedDatabase"; - public static final String TABLE = "connection[0].table[0]"; - public static final String COLUMN = "column"; - - public static final String USERNAME = "username"; - public static final String PASSWORD = "password"; - - public static final String PRE_SQL = "preSql"; - public static final String POST_SQL = "postSql"; - - public static final String LOAD_PROPS = "loadProps"; - public static final String LOAD_PROPS_LINE_DELIMITER = "line_delimiter"; - public static final String LOAD_PROPS_COLUMN_SEPARATOR = "column_separator"; - - public static final String MAX_BATCH_ROWS = "maxBatchRows"; - public static final String BATCH_BYTE_SIZE = "maxBatchSize"; - public static final String MAX_RETRIES = "maxRetries"; - public static final String LABEL_PREFIX = "labelPrefix"; - public static final String FORMAT = "format"; - public static final String CONNECT_TIMEOUT = "connectTimeout"; - private final Configuration options; - - private static final long DEFAULT_MAX_BATCH_ROWS = 500000; - - private static final long DEFAULT_BATCH_BYTE_SIZE = 90 * 1024 * 1024; - private static final int DEFAULT_MAX_RETRIES = 0; - - private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_"; - private static final String DEFAULT_COLUMN_SEPARATOR = "\\x01"; - private static final String DEFAULT_LINE_DELIMITER = "\\x02"; - public static final String DEFAULT_FORMAT_CSV = "csv"; - private static final String DEFAULT_TIME_ZONE = "+08:00"; - private static final int DEFAULT_CONNECT_TIMEOUT = -1; - - public Key(final Configuration options) { - this.options = options; - } - - public void doPretreatment() { - this.validateRequired(); - this.validateStreamLoadUrl(); - this.validateFormat(); - } - - public String getJdbcUrl() { - return this.options.getString(JDBC_URL); - } - - public String getDatabase() { - return this.options.getString(DATABASE); - } - - public String getTable() { - return this.options.getString(TABLE); - } - - public String getUsername() { - return this.options.getString(USERNAME); - } - - public String getPassword() { - return Strings.nullToEmpty(this.options.getString(PASSWORD)); - } - - public List getBeLoadUrlList() { - return this.options.getList(BE_LOAD_URL, String.class); - } - - public List getFeLoadUrlList() { - return this.options.getList(FE_LOAD_URL, String.class); - } - - public List getColumns() { - return this.options.getList(COLUMN, String.class); - } - - public List getPreSqlList() { - return this.options.getList(PRE_SQL, String.class); - } - - public List getPostSqlList() { - return this.options.getList(POST_SQL, String.class); - } - - public Map getLoadProps() { - return this.options.getMap(LOAD_PROPS, new HashMap<>()); - } - - public long getBatchRows() { - return this.options.getLong(MAX_BATCH_ROWS, DEFAULT_MAX_BATCH_ROWS); - } - - public long getBatchByteSize() { - return this.options.getLong(BATCH_BYTE_SIZE, DEFAULT_BATCH_BYTE_SIZE); - } - - public int getMaxRetries() { - return this.options.getInt(MAX_RETRIES, DEFAULT_MAX_RETRIES); - } - - public String getLabelPrefix() { - return this.options.getString(LABEL_PREFIX, DEFAULT_LABEL_PREFIX); - } - - public String getLineDelimiter() { - return getLoadProps().getOrDefault(LOAD_PROPS_LINE_DELIMITER, DEFAULT_LINE_DELIMITER).toString(); - } - - public String getFormat() { - return this.options.getString(FORMAT, DEFAULT_FORMAT_CSV); - } - - public String getColumnSeparator() { - return getLoadProps().getOrDefault(LOAD_PROPS_COLUMN_SEPARATOR, DEFAULT_COLUMN_SEPARATOR).toString(); - } - - public int getConnectTimeout() { - return this.options.getInt(CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); - } - - - private void validateStreamLoadUrl() { - List urlList = this.getBeLoadUrlList(); - if (urlList == null) { - urlList = this.getFeLoadUrlList(); - } - if (urlList == null || urlList.isEmpty()) { - throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "Either beLoadUrl or feLoadUrl must be set"); - } - - for (final String host : urlList) { - if (host.split(":").length < 2) { - throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, - "Invalid load url format. IF use FE hosts, should be like: fe_host:fe_http_port." - + " If use BE hosts, should be like: be_host:be_webserver_port"); - } - } - } - - private void validateFormat() { - String format = this.getFormat(); - if (!Arrays.asList("csv", "json").contains(format.toLowerCase())) { - throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "format only supports csv or json"); - } - } - - private void validateRequired() { - final String[] requiredOptionKeys = new String[]{JDBC_URL, USERNAME, DATABASE, TABLE, COLUMN}; - for (final String optionKey : requiredOptionKeys) { - this.options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE); - } - } -}