mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-02 05:11:42 +08:00
Refactoring doris writer code
Refactoring doris writer code
This commit is contained in:
parent
0efbc5df08
commit
8b46e82a60
@ -20,34 +20,40 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
|
||||
"reader": {
|
||||
"name": "mysqlreader",
|
||||
"parameter": {
|
||||
"column": ["k1", "k2", "k3"],
|
||||
"column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"],
|
||||
"connection": [
|
||||
{
|
||||
"jdbcUrl": ["jdbc:mysql://127.0.0.1:3306/db1"],
|
||||
"table": ["t1"]
|
||||
"jdbcUrl": ["jdbc:mysql://localhost:3306/demo"],
|
||||
"table": ["employees_1"]
|
||||
}
|
||||
],
|
||||
"username": "root",
|
||||
"password": "",
|
||||
"password": "xxxxx",
|
||||
"where": ""
|
||||
}
|
||||
},
|
||||
"writer": {
|
||||
"name": "doriswriter",
|
||||
"parameter": {
|
||||
"loadUrl": ["127.0.0.1:8030"],
|
||||
"loadProps": {},
|
||||
"database": "db1",
|
||||
"column": ["k1", "k2", "k3"],
|
||||
"loadUrl": ["172.16.0.13:8030"],
|
||||
"loadProps": {
|
||||
},
|
||||
"column": ["emp_no", "birth_date", "first_name","last_name","gender","hire_date"],
|
||||
"username": "root",
|
||||
"password": "",
|
||||
"postSql": [],
|
||||
"password": "xxxxxx",
|
||||
"postSql": ["select count(1) from all_employees_info"],
|
||||
"preSql": [],
|
||||
"connection": [
|
||||
"jdbcUrl":"jdbc:mysql://127.0.0.1:9030/demo",
|
||||
"table":["xxx"],
|
||||
"selectedDatabase":"xxxx"
|
||||
]
|
||||
{
|
||||
"jdbcUrl": "jdbc:mysql://172.16.0.13:9030/demo",
|
||||
"database": "demo",
|
||||
"table": ["all_employees_info"]
|
||||
}
|
||||
],
|
||||
"loadProps": {
|
||||
"format": "json",
|
||||
"strip_outer_array": true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -159,3 +165,22 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
|
||||
- 描述:StreamLoad单次请求的超时时间, 单位毫秒(ms)。
|
||||
- 必选:否
|
||||
- 默认值:-1
|
||||
|
||||
### 类型转换
|
||||
|
||||
默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行StreamLoad导入操作。
|
||||
如需更改列分隔符, 则正确配置 `loadProps` 即可:
|
||||
```json
|
||||
"loadProps": {
|
||||
"column_separator": "\\x01",
|
||||
"row_delimiter": "\\x02"
|
||||
}
|
||||
```
|
||||
|
||||
如需更改导入格式为`json`, 则正确配置 `loadProps` 即可:
|
||||
```json
|
||||
"loadProps": {
|
||||
"format": "json",
|
||||
"strip_outer_array": true
|
||||
}
|
||||
```
|
@ -1,2 +1,23 @@
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;public class DorisBaseSerializer {
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Column;
|
||||
|
||||
public class DorisBaseSerializer {
|
||||
protected String fieldConvertion( Column col) {
|
||||
if (null == col.getRawData() || Column.Type.NULL == col.getType()) {
|
||||
return null;
|
||||
}
|
||||
if ( Column.Type.BOOL == col.getType()) {
|
||||
return String.valueOf(col.asLong());
|
||||
}
|
||||
if ( Column.Type.BYTES == col.getType()) {
|
||||
byte[] bts = (byte[])col.getRawData();
|
||||
long value = 0;
|
||||
for (int i = 0; i < bts.length; i++) {
|
||||
value += (bts[bts.length - i - 1] & 0xffL) << (8 * i);
|
||||
}
|
||||
return String.valueOf(value);
|
||||
}
|
||||
return col.asString();
|
||||
}
|
||||
}
|
||||
|
@ -1,73 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Column;
|
||||
import com.alibaba.datax.common.element.DateColumn;
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
import org.apache.commons.lang3.time.DateFormatUtils;
|
||||
|
||||
import java.time.ZoneId;
|
||||
import java.util.List;
|
||||
|
||||
public abstract class DorisCodec {
|
||||
protected final List<String> fieldNames;
|
||||
|
||||
public DorisCodec(final List<String> fieldNames) {
|
||||
this.fieldNames = fieldNames;
|
||||
}
|
||||
|
||||
public abstract String serialize(Record row);
|
||||
|
||||
/**
|
||||
* convert datax internal data to string
|
||||
*
|
||||
* @param col
|
||||
* @return
|
||||
*/
|
||||
protected Object convertColumn(final Column col) {
|
||||
if (null == col.getRawData()) {
|
||||
return null;
|
||||
}
|
||||
Column.Type type = col.getType();
|
||||
switch (type) {
|
||||
case BOOL:
|
||||
case INT:
|
||||
case LONG:
|
||||
return col.asLong();
|
||||
case DOUBLE:
|
||||
return col.asDouble();
|
||||
case STRING:
|
||||
return col.asString();
|
||||
case DATE: {
|
||||
final DateColumn.DateType dateType = ((DateColumn) col).getSubType();
|
||||
switch (dateType) {
|
||||
case DATE:
|
||||
return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd");
|
||||
case DATETIME:
|
||||
return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss");
|
||||
default:
|
||||
return col.asString();
|
||||
}
|
||||
}
|
||||
default:
|
||||
// BAD, NULL, BYTES
|
||||
return null;
|
||||
}
|
||||
}
|
||||
}
|
@ -1,49 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
public class DorisCsvCodec extends DorisCodec {
|
||||
|
||||
private final String columnSeparator;
|
||||
|
||||
public DorisCsvCodec(final List<String> fieldNames, String columnSeparator) {
|
||||
super(fieldNames);
|
||||
this.columnSeparator = EscapeHandler.escapeString(columnSeparator);
|
||||
}
|
||||
|
||||
@Override
|
||||
public String serialize(final Record row) {
|
||||
if (null == this.fieldNames) {
|
||||
return "";
|
||||
}
|
||||
List<String> list = new ArrayList<>();
|
||||
|
||||
for (int i = 0; i < this.fieldNames.size(); i++) {
|
||||
Object value = this.convertColumn(row.getColumn(i));
|
||||
list.add(value != null ? value.toString() : "\\N");
|
||||
}
|
||||
|
||||
return String.join(columnSeparator, list);
|
||||
}
|
||||
|
||||
}
|
@ -1,2 +1,26 @@
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;public class DorisCsvSerializer {
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
|
||||
public class DorisCsvSerializer extends DorisBaseSerializer implements DorisSerializer{
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final String columnSeparator;
|
||||
|
||||
public DorisCsvSerializer(String sp) {
|
||||
this.columnSeparator = DorisDelimiterParser.parse(sp, "\t");
|
||||
}
|
||||
|
||||
@Override
|
||||
public String serialize( Record row) {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (int i = 0; i < row.getColumnNumber(); i++) {
|
||||
String value = fieldConvertion(row.getColumn(i));
|
||||
sb.append(null == value ? "\\N" : value);
|
||||
if (i < row.getColumnNumber() - 1) {
|
||||
sb.append(columnSeparator);
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
}
|
||||
|
@ -1,2 +1,54 @@
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;public class DorisDelimiterParser {
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
import java.io.StringWriter;
|
||||
|
||||
public class DorisDelimiterParser {
|
||||
|
||||
private static final String HEX_STRING = "0123456789ABCDEF";
|
||||
|
||||
public static String parse(String sp, String dSp) throws RuntimeException {
|
||||
if ( Strings.isNullOrEmpty(sp)) {
|
||||
return dSp;
|
||||
}
|
||||
if (!sp.toUpperCase().startsWith("\\X")) {
|
||||
return sp;
|
||||
}
|
||||
String hexStr = sp.substring(2);
|
||||
// check hex str
|
||||
if (hexStr.isEmpty()) {
|
||||
throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`");
|
||||
}
|
||||
if (hexStr.length() % 2 != 0) {
|
||||
throw new RuntimeException("Failed to parse delimiter: `Hex str length error`");
|
||||
}
|
||||
for (char hexChar : hexStr.toUpperCase().toCharArray()) {
|
||||
if (HEX_STRING.indexOf(hexChar) == -1) {
|
||||
throw new RuntimeException("Failed to parse delimiter: `Hex str format error`");
|
||||
}
|
||||
}
|
||||
// transform to separator
|
||||
StringWriter writer = new StringWriter();
|
||||
for (byte b : hexStrToBytes(hexStr)) {
|
||||
writer.append((char) b);
|
||||
}
|
||||
return writer.toString();
|
||||
}
|
||||
|
||||
private static byte[] hexStrToBytes(String hexStr) {
|
||||
String upperHexStr = hexStr.toUpperCase();
|
||||
int length = upperHexStr.length() / 2;
|
||||
char[] hexChars = upperHexStr.toCharArray();
|
||||
byte[] bytes = new byte[length];
|
||||
for (int i = 0; i < length; i++) {
|
||||
int pos = i * 2;
|
||||
bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1]));
|
||||
}
|
||||
return bytes;
|
||||
}
|
||||
|
||||
private static byte charToByte(char c) {
|
||||
return (byte) HEX_STRING.indexOf(c);
|
||||
}
|
||||
}
|
||||
|
@ -1,66 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
// A wrapper class to hold a batch of loaded rows
|
||||
public class DorisFlushBatch {
|
||||
private final String format;
|
||||
private final String lineDelimiter;
|
||||
private String label;
|
||||
private long byteSize = 0;
|
||||
private List<String> data = new ArrayList<>();
|
||||
|
||||
public DorisFlushBatch(String lineDelimiter, String format) {
|
||||
this.lineDelimiter = EscapeHandler.escapeString(lineDelimiter);
|
||||
this.format = format;
|
||||
}
|
||||
|
||||
public void setLabel(String label) {
|
||||
this.label = label;
|
||||
}
|
||||
|
||||
public String getLabel() {
|
||||
return label;
|
||||
}
|
||||
|
||||
public long getRows() {
|
||||
return data.size();
|
||||
}
|
||||
|
||||
public void putData(String row) {
|
||||
data.add(row);
|
||||
byteSize += row.getBytes().length;
|
||||
}
|
||||
|
||||
public String getData() {
|
||||
String result;
|
||||
if (Key.DEFAULT_FORMAT_CSV.equalsIgnoreCase(format)) {
|
||||
result = String.join(this.lineDelimiter, data);
|
||||
} else {
|
||||
result = data.toString();
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public long getSize() {
|
||||
return byteSize;
|
||||
}
|
||||
}
|
@ -1,51 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.alibaba.fastjson.serializer.SerializerFeature;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
// Convert DataX data to json
|
||||
public class DorisJsonCodec extends DorisCodec {
|
||||
private Map<String, Object> rowMap;
|
||||
|
||||
public DorisJsonCodec(final List<String> fieldNames) {
|
||||
super(fieldNames);
|
||||
this.rowMap = new HashMap<>(this.fieldNames.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
public String serialize(final Record row) {
|
||||
if (null == this.fieldNames) {
|
||||
return "";
|
||||
}
|
||||
|
||||
rowMap.clear();
|
||||
int idx = 0;
|
||||
for (final String fieldName : this.fieldNames) {
|
||||
rowMap.put(fieldName, this.convertColumn(row.getColumn(idx)));
|
||||
++idx;
|
||||
}
|
||||
return JSON.toJSONString(rowMap, SerializerFeature.WriteMapNullValue);
|
||||
}
|
||||
}
|
@ -1,2 +1,33 @@
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;public class DorisJsonSerializer {
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class DorisJsonSerializer extends DorisBaseSerializer implements DorisSerializer{
|
||||
|
||||
private static final long serialVersionUID = 1L;
|
||||
|
||||
private final List<String> fieldNames;
|
||||
|
||||
public DorisJsonSerializer( List<String> fieldNames) {
|
||||
this.fieldNames = fieldNames;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String serialize( Record row) {
|
||||
if (null == fieldNames) {
|
||||
return "";
|
||||
}
|
||||
Map<String, Object> rowMap = new HashMap<> (fieldNames.size());
|
||||
int idx = 0;
|
||||
for (String fieldName : fieldNames) {
|
||||
rowMap.put(fieldName, fieldConvertion(row.getColumn(idx)));
|
||||
idx++;
|
||||
}
|
||||
return JSON.toJSONString(rowMap);
|
||||
}
|
||||
}
|
||||
|
@ -1,2 +1,10 @@
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;public class DorisSerializer {
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.alibaba.datax.common.element.Record;
|
||||
|
||||
import java.io.Serializable;
|
||||
|
||||
public interface DorisSerializer extends Serializable {
|
||||
|
||||
String serialize( Record row);
|
||||
}
|
||||
|
@ -1,2 +1,19 @@
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;public class DorisSerializerFactory {
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import java.util.Map;
|
||||
|
||||
public class DorisSerializerFactory {
|
||||
public DorisSerializerFactory(){
|
||||
|
||||
}
|
||||
public static DorisSerializer createSerializer(DorisWriterOptions writerOptions) {
|
||||
if (DorisWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
|
||||
Map<String, Object> props = writerOptions.getLoadProps();
|
||||
return new DorisCsvSerializer(null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator")));
|
||||
}
|
||||
if (DorisWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
|
||||
return new DorisJsonSerializer(writerOptions.getColumns());
|
||||
}
|
||||
throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties.");
|
||||
}
|
||||
}
|
||||
|
@ -1,2 +1,29 @@
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;public class DorisStreamLoadExcetion {
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
|
||||
public class DorisStreamLoadExcetion extends IOException {
|
||||
|
||||
private final Map<String, Object> response;
|
||||
private boolean reCreateLabel;
|
||||
|
||||
public DorisStreamLoadExcetion(String message, Map<String, Object> response) {
|
||||
super(message);
|
||||
this.response = response;
|
||||
}
|
||||
|
||||
public DorisStreamLoadExcetion(String message, Map<String, Object> response, boolean reCreateLabel) {
|
||||
super(message);
|
||||
this.response = response;
|
||||
this.reCreateLabel = reCreateLabel;
|
||||
}
|
||||
|
||||
public Map<String, Object> getFailedResponse() {
|
||||
return response;
|
||||
}
|
||||
|
||||
public boolean needReCreateLabel() {
|
||||
return reCreateLabel;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,233 @@
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.client.config.RequestConfig;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpPut;
|
||||
import org.apache.http.entity.ByteArrayEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.DefaultRedirectStrategy;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.HttpURLConnection;
|
||||
import java.net.URL;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class DorisStreamLoadVisitor {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadVisitor.class);
|
||||
|
||||
private DorisWriterOptions options;
|
||||
|
||||
private long pos;
|
||||
private static final String RESULT_FAILED = "Fail";
|
||||
private static final String RESULT_LABEL_EXISTED = "Label Already Exists";
|
||||
private static final String LAEBL_STATE_VISIBLE = "VISIBLE";
|
||||
private static final String LAEBL_STATE_COMMITTED = "COMMITTED";
|
||||
private static final String RESULT_LABEL_PREPARE = "PREPARE";
|
||||
private static final String RESULT_LABEL_ABORTED = "ABORTED";
|
||||
private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
|
||||
|
||||
|
||||
public DorisStreamLoadVisitor(DorisWriterOptions options){
|
||||
this.options = options;
|
||||
}
|
||||
|
||||
public void streamLoad(DorisWriterTuple data) throws Exception {
|
||||
String host = getAvailableHost();
|
||||
if(host == null){
|
||||
throw new IOException ("None of the host in `load_url` could be connected.");
|
||||
}
|
||||
String loadUrl = new StringBuilder(host)
|
||||
.append("/api/")
|
||||
.append(options.getDatabase())
|
||||
.append("/")
|
||||
.append(options.getTable())
|
||||
.append("/_stream_load")
|
||||
.toString();
|
||||
LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", data.getRows().size(), data.getBytes(), data.getLabel()));
|
||||
Map<String, Object> loadResult = doHttpPut(loadUrl, data.getLabel(), joinRows(data.getRows(), data.getBytes().intValue()));
|
||||
final String keyStatus = "Status";
|
||||
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
|
||||
throw new IOException("Unable to flush data to Doris: unknown result status.");
|
||||
}
|
||||
LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString());
|
||||
if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
|
||||
throw new IOException(
|
||||
new StringBuilder("Failed to flush data to Doris.\n").append(JSON.toJSONString(loadResult)).toString()
|
||||
);
|
||||
} else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
|
||||
LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString());
|
||||
// has to block-checking the state to get the final result
|
||||
checkLabelState(host, data.getLabel());
|
||||
}
|
||||
}
|
||||
|
||||
private void checkLabelState(String host, String label) throws IOException {
|
||||
int idx = 0;
|
||||
while(true) {
|
||||
try {
|
||||
TimeUnit.SECONDS.sleep(Math.min(++idx, 5));
|
||||
} catch (InterruptedException ex) {
|
||||
break;
|
||||
}
|
||||
try (CloseableHttpClient httpclient = HttpClients.createDefault()) {
|
||||
HttpGet httpGet = new HttpGet(new StringBuilder(host).append("/api/").append(options.getDatabase()).append("/get_load_state?label=").append(label).toString());
|
||||
httpGet.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
|
||||
httpGet.setHeader("Connection", "close");
|
||||
|
||||
try (CloseableHttpResponse resp = httpclient.execute(httpGet)) {
|
||||
HttpEntity respEntity = getHttpEntity(resp);
|
||||
if (respEntity == null) {
|
||||
throw new IOException(String.format("Failed to flush data to Doris, Error " +
|
||||
"could not get the final state of label[%s].\n", label), null);
|
||||
}
|
||||
Map<String, Object> result = (Map<String, Object>)JSON.parse(EntityUtils.toString(respEntity));
|
||||
String labelState = (String)result.get("state");
|
||||
if (null == labelState) {
|
||||
throw new IOException(String.format("Failed to flush data to Doris, Error " +
|
||||
"could not get the final state of label[%s]. response[%s]\n", label, EntityUtils.toString(respEntity)), null);
|
||||
}
|
||||
LOG.info(String.format("Checking label[%s] state[%s]\n", label, labelState));
|
||||
switch(labelState) {
|
||||
case LAEBL_STATE_VISIBLE:
|
||||
case LAEBL_STATE_COMMITTED:
|
||||
return;
|
||||
case RESULT_LABEL_PREPARE:
|
||||
continue;
|
||||
case RESULT_LABEL_ABORTED:
|
||||
throw new DorisStreamLoadExcetion(String.format("Failed to flush data to Doris, Error " +
|
||||
"label[%s] state[%s]\n", label, labelState), null, true);
|
||||
case RESULT_LABEL_UNKNOWN:
|
||||
default:
|
||||
throw new IOException(String.format("Failed to flush data to Doris, Error " +
|
||||
"label[%s] state[%s]\n", label, labelState), null);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private byte[] joinRows(List<byte[]> rows, int totalBytes) {
|
||||
if (DorisWriterOptions.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
|
||||
Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps());
|
||||
byte[] lineDelimiter = DorisDelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
|
||||
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
|
||||
for (byte[] row : rows) {
|
||||
bos.put(row);
|
||||
bos.put(lineDelimiter);
|
||||
}
|
||||
return bos.array();
|
||||
}
|
||||
|
||||
if (DorisWriterOptions.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
|
||||
ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
|
||||
bos.put("[".getBytes(StandardCharsets.UTF_8));
|
||||
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
|
||||
boolean isFirstElement = true;
|
||||
for (byte[] row : rows) {
|
||||
if (!isFirstElement) {
|
||||
bos.put(jsonDelimiter);
|
||||
}
|
||||
bos.put(row);
|
||||
isFirstElement = false;
|
||||
}
|
||||
bos.put("]".getBytes(StandardCharsets.UTF_8));
|
||||
return bos.array();
|
||||
}
|
||||
throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
|
||||
}
|
||||
private Map<String, Object> doHttpPut(String loadUrl, String label, byte[] data) throws IOException {
|
||||
LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
|
||||
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
|
||||
.setRedirectStrategy(new DefaultRedirectStrategy () {
|
||||
@Override
|
||||
protected boolean isRedirectable(String method) {
|
||||
return true;
|
||||
}
|
||||
});
|
||||
try ( CloseableHttpClient httpclient = httpClientBuilder.build()) {
|
||||
HttpPut httpPut = new HttpPut(loadUrl);
|
||||
List<String> cols = options.getColumns();
|
||||
if (null != cols && !cols.isEmpty() && DorisWriterOptions.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
|
||||
httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList())));
|
||||
}
|
||||
if (null != options.getLoadProps()) {
|
||||
for (Map.Entry<String, Object> entry : options.getLoadProps().entrySet()) {
|
||||
httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
|
||||
}
|
||||
}
|
||||
httpPut.setHeader("Expect", "100-continue");
|
||||
httpPut.setHeader("label", label);
|
||||
httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded");
|
||||
httpPut.setHeader("Authorization", getBasicAuthHeader(options.getUsername(), options.getPassword()));
|
||||
httpPut.setEntity(new ByteArrayEntity (data));
|
||||
httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build());
|
||||
try ( CloseableHttpResponse resp = httpclient.execute(httpPut)) {
|
||||
HttpEntity respEntity = getHttpEntity(resp);
|
||||
if (respEntity == null)
|
||||
return null;
|
||||
return (Map<String, Object>)JSON.parse(EntityUtils.toString(respEntity));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private String getBasicAuthHeader(String username, String password) {
|
||||
String auth = username + ":" + password;
|
||||
byte[] encodedAuth = Base64.encodeBase64(auth.getBytes(StandardCharsets.UTF_8));
|
||||
return new StringBuilder("Basic ").append(new String(encodedAuth)).toString();
|
||||
}
|
||||
|
||||
private HttpEntity getHttpEntity(CloseableHttpResponse resp) {
|
||||
int code = resp.getStatusLine().getStatusCode();
|
||||
if (200 != code) {
|
||||
LOG.warn("Request failed with code:{}", code);
|
||||
return null;
|
||||
}
|
||||
HttpEntity respEntity = resp.getEntity();
|
||||
if (null == respEntity) {
|
||||
LOG.warn("Request failed with empty response.");
|
||||
return null;
|
||||
}
|
||||
return respEntity;
|
||||
}
|
||||
|
||||
private String getAvailableHost() {
|
||||
List<String> hostList = options.getLoadUrlList();
|
||||
long tmp = pos + hostList.size();
|
||||
for (; pos < tmp; pos++) {
|
||||
String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString();
|
||||
if (tryHttpConnection(host)) {
|
||||
return host;
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private boolean tryHttpConnection(String host) {
|
||||
try {
|
||||
URL url = new URL(host);
|
||||
HttpURLConnection co = (HttpURLConnection) url.openConnection();
|
||||
co.setConnectTimeout(5000);
|
||||
co.connect();
|
||||
co.disconnect();
|
||||
return true;
|
||||
} catch (Exception e1) {
|
||||
LOG.warn("Failed to connect to address:{}", host, e1);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,105 @@
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
||||
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
|
||||
import com.alibaba.datax.plugin.rdbms.writer.Constant;
|
||||
import com.alibaba.druid.sql.parser.ParserException;
|
||||
import com.google.common.base.Strings;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.sql.Connection;
|
||||
import java.sql.ResultSet;
|
||||
import java.sql.Statement;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
/**
|
||||
* jdbc util
|
||||
*/
|
||||
public class DorisUtil {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DorisUtil.class);
|
||||
|
||||
private DorisUtil() {}
|
||||
|
||||
public static List<String> getDorisTableColumns( Connection conn, String databaseName, String tableName) {
|
||||
String currentSql = String.format("SELECT COLUMN_NAME FROM `information_schema`.`COLUMNS` WHERE `TABLE_SCHEMA` = '%s' AND `TABLE_NAME` = '%s' ORDER BY `ORDINAL_POSITION` ASC;", databaseName, tableName);
|
||||
List<String> columns = new ArrayList<> ();
|
||||
ResultSet rs = null;
|
||||
try {
|
||||
rs = DBUtil.query(conn, currentSql);
|
||||
while (DBUtil.asyncResultSetNext(rs)) {
|
||||
String colName = rs.getString("COLUMN_NAME");
|
||||
columns.add(colName);
|
||||
}
|
||||
return columns;
|
||||
} catch (Exception e) {
|
||||
throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
|
||||
} finally {
|
||||
DBUtil.closeDBResources(rs, null, null);
|
||||
}
|
||||
}
|
||||
|
||||
public static List<String> renderPreOrPostSqls(List<String> preOrPostSqls, String tableName) {
|
||||
if (null == preOrPostSqls) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
List<String> renderedSqls = new ArrayList<>();
|
||||
for (String sql : preOrPostSqls) {
|
||||
if (! Strings.isNullOrEmpty(sql)) {
|
||||
renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName));
|
||||
}
|
||||
}
|
||||
return renderedSqls;
|
||||
}
|
||||
|
||||
public static void executeSqls(Connection conn, List<String> sqls) {
|
||||
Statement stmt = null;
|
||||
String currentSql = null;
|
||||
try {
|
||||
stmt = conn.createStatement();
|
||||
for (String sql : sqls) {
|
||||
currentSql = sql;
|
||||
DBUtil.executeSqlWithoutResultSet(stmt, sql);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
|
||||
} finally {
|
||||
DBUtil.closeDBResources(null, stmt, null);
|
||||
}
|
||||
}
|
||||
|
||||
public static void preCheckPrePareSQL(DorisWriterOptions options) {
|
||||
String table = options.getTable();
|
||||
List<String> preSqls = options.getPreSqlList();
|
||||
List<String> renderedPreSqls = DorisUtil.renderPreOrPostSqls(preSqls, table);
|
||||
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
|
||||
LOG.info("Begin to preCheck preSqls:[{}].", String.join(";", renderedPreSqls));
|
||||
for (String sql : renderedPreSqls) {
|
||||
try {
|
||||
DBUtil.sqlValid(sql, DataBaseType.MySql);
|
||||
} catch ( ParserException e) {
|
||||
throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static void preCheckPostSQL(DorisWriterOptions options) {
|
||||
String table = options.getTable();
|
||||
List<String> postSqls = options.getPostSqlList();
|
||||
List<String> renderedPostSqls = DorisUtil.renderPreOrPostSqls(postSqls, table);
|
||||
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
|
||||
LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls));
|
||||
for(String sql : renderedPostSqls) {
|
||||
try {
|
||||
DBUtil.sqlValid(sql, DataBaseType.MySql);
|
||||
} catch (ParserException e){
|
||||
throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -39,131 +39,41 @@ import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* doris data writer
|
||||
*/
|
||||
public class DorisWriter extends Writer {
|
||||
public DorisWriter() {
|
||||
}
|
||||
|
||||
public static class Task extends com.alibaba.datax.common.spi.Writer.Task {
|
||||
private DorisWriterEmitter dorisWriterEmitter;
|
||||
private Key keys;
|
||||
private DorisCodec rowCodec;
|
||||
private int batchNum = 0;
|
||||
private String labelPrefix;
|
||||
public static class Job extends Writer.Job {
|
||||
|
||||
public Task() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
this.keys = new Key(super.getPluginJobConf());
|
||||
if (Key.DEFAULT_FORMAT_CSV.equalsIgnoreCase(this.keys.getFormat())) {
|
||||
this.rowCodec = new DorisCsvCodec(this.keys.getColumns(), this.keys.getColumnSeparator());
|
||||
} else {
|
||||
this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
|
||||
}
|
||||
this.labelPrefix = this.keys.getLabelPrefix() + UUID.randomUUID();
|
||||
this.dorisWriterEmitter = new DorisWriterEmitter(keys);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startWrite(RecordReceiver recordReceiver) {
|
||||
String lineDelimiter = this.keys.getLineDelimiter();
|
||||
DorisFlushBatch flushBatch = new DorisFlushBatch(lineDelimiter, this.keys.getFormat());
|
||||
long batchCount = 0;
|
||||
long batchByteSize = 0L;
|
||||
Record record;
|
||||
// loop to get record from datax
|
||||
while ((record = recordReceiver.getFromReader()) != null) {
|
||||
// check column size
|
||||
if (record.getColumnNumber() != this.keys.getColumns().size()) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
|
||||
String.format("config writer column info error. because the column number of reader is :%s" +
|
||||
"and the column number of writer is:%s. please check you datax job config json.",
|
||||
record.getColumnNumber(), this.keys.getColumns().size()));
|
||||
}
|
||||
// codec record
|
||||
final String recordStr = this.rowCodec.serialize(record);
|
||||
|
||||
// put into buffer
|
||||
flushBatch.putData(recordStr);
|
||||
batchCount += 1;
|
||||
batchByteSize += recordStr.length();
|
||||
// trigger buffer
|
||||
if (batchCount >= this.keys.getBatchRows() || batchByteSize >= this.keys.getBatchByteSize()) {
|
||||
// generate doris stream load label
|
||||
flush (flushBatch);
|
||||
// clear buffer
|
||||
batchCount = 0;
|
||||
batchByteSize = 0L;
|
||||
flushBatch = new DorisFlushBatch (lineDelimiter, this.keys.getFormat());
|
||||
}
|
||||
} // end of while
|
||||
|
||||
if (flushBatch.getSize() > 0) {
|
||||
flush(flushBatch);
|
||||
}
|
||||
}
|
||||
|
||||
private void flush(DorisFlushBatch flushBatch) {
|
||||
flushBatch.setLabel(getStreamLoadLabel());
|
||||
dorisWriterEmitter.emit(flushBatch);
|
||||
}
|
||||
|
||||
private String getStreamLoadLabel() {
|
||||
return labelPrefix + "_" + (batchNum++);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void post() {
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean supportFailOver() {
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
public static class Job extends com.alibaba.datax.common.spi.Writer.Job {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DorisWriter.Job.class);
|
||||
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
||||
private Configuration originalConfig = null;
|
||||
private Key keys;
|
||||
|
||||
public Job() {
|
||||
}
|
||||
private DorisWriterOptions options;
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
this.originalConfig = super.getPluginJobConf();
|
||||
this.keys = new Key(super.getPluginJobConf());
|
||||
this.keys.doPretreatment();
|
||||
options = new DorisWriterOptions(super.getPluginJobConf());
|
||||
options.doPretreatment();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void preCheck() {
|
||||
public void preCheck(){
|
||||
this.init();
|
||||
this.preCheckPrePareSQL(this.keys);
|
||||
this.preCheckPostSQL(this.keys);
|
||||
DorisUtil.preCheckPrePareSQL(options);
|
||||
DorisUtil.preCheckPostSQL(options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare() {
|
||||
String username = this.keys.getUsername();
|
||||
String password = this.keys.getPassword();
|
||||
String jdbcUrl = this.keys.getJdbcUrl();
|
||||
List<String> renderedPreSqls = this.renderPreOrPostSqls(this.keys.getPreSqlList(), this.keys.getTable());
|
||||
if (!renderedPreSqls.isEmpty()) {
|
||||
String username = options.getUsername();
|
||||
String password = options.getPassword();
|
||||
String jdbcUrl = options.getJdbcUrl();
|
||||
List<String> renderedPreSqls = DorisUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable());
|
||||
if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) {
|
||||
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
|
||||
LOG.info("prepare execute preSqls:[{}]. doris jdbc url:{}.", String.join(";", renderedPreSqls), jdbcUrl);
|
||||
this.executeSqls(conn, renderedPreSqls);
|
||||
LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl);
|
||||
DorisUtil.executeSqls(conn, renderedPreSqls);
|
||||
DBUtil.closeDBResources(null, null, conn);
|
||||
}
|
||||
}
|
||||
@ -171,93 +81,89 @@ public class DorisWriter extends Writer {
|
||||
@Override
|
||||
public List<Configuration> split(int mandatoryNumber) {
|
||||
List<Configuration> configurations = new ArrayList<>(mandatoryNumber);
|
||||
|
||||
for (int i = 0; i < mandatoryNumber; ++i) {
|
||||
configurations.add(this.originalConfig);
|
||||
for (int i = 0; i < mandatoryNumber; i++) {
|
||||
configurations.add(originalConfig);
|
||||
}
|
||||
|
||||
return configurations;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void post() {
|
||||
String username = this.keys.getUsername();
|
||||
String password = this.keys.getPassword();
|
||||
String jdbcUrl = this.keys.getJdbcUrl();
|
||||
List<String> renderedPostSqls = this.renderPreOrPostSqls(this.keys.getPostSqlList(), this.keys.getTable());
|
||||
if (!renderedPostSqls.isEmpty()) {
|
||||
String username = options.getUsername();
|
||||
String password = options.getPassword();
|
||||
String jdbcUrl = options.getJdbcUrl();
|
||||
LOG.info("userName :{},password:{},jdbcUrl:{}.", username,password,jdbcUrl);
|
||||
List<String> renderedPostSqls = DorisUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable());
|
||||
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
|
||||
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
|
||||
LOG.info("prepare execute postSqls:[{}]. doris jdbc url为:{}.", String.join(";", renderedPostSqls), jdbcUrl);
|
||||
this.executeSqls(conn, renderedPostSqls);
|
||||
LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl);
|
||||
DorisUtil.executeSqls(conn, renderedPostSqls);
|
||||
DBUtil.closeDBResources(null, null, conn);
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Override
|
||||
public void destroy() {
|
||||
}
|
||||
|
||||
private List<String> renderPreOrPostSqls(final List<String> preOrPostSqls, final String tableName) {
|
||||
if (null == preOrPostSqls) {
|
||||
return Collections.emptyList();
|
||||
}
|
||||
final List<String> renderedSqls = new ArrayList<>();
|
||||
for (final String sql : preOrPostSqls) {
|
||||
if (!Strings.isNullOrEmpty(sql)) {
|
||||
renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName));
|
||||
}
|
||||
}
|
||||
return renderedSqls;
|
||||
}
|
||||
|
||||
private void executeSqls(final Connection conn, final List<String> sqls) {
|
||||
Statement stmt = null;
|
||||
String currentSql = null;
|
||||
public static class Task extends Writer.Task {
|
||||
private DorisWriterManager writerManager;
|
||||
private DorisWriterOptions options;
|
||||
private DorisSerializer rowSerializer;
|
||||
|
||||
@Override
|
||||
public void init() {
|
||||
options = new DorisWriterOptions(super.getPluginJobConf());
|
||||
if (options.isWildcardColumn()) {
|
||||
Connection conn = DBUtil.getConnection(DataBaseType.MySql, options.getJdbcUrl(), options.getUsername(), options.getPassword());
|
||||
List<String> columns = DorisUtil.getDorisTableColumns(conn, options.getDatabase(), options.getTable());
|
||||
options.setInfoCchemaColumns(columns);
|
||||
}
|
||||
writerManager = new DorisWriterManager(options);
|
||||
rowSerializer = DorisSerializerFactory.createSerializer(options);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void prepare() {
|
||||
}
|
||||
|
||||
public void startWrite(RecordReceiver recordReceiver) {
|
||||
try {
|
||||
stmt = conn.createStatement();
|
||||
for (String s : sqls) {
|
||||
final String sql = currentSql = s;
|
||||
DBUtil.executeSqlWithoutResultSet(stmt, sql);
|
||||
Record record;
|
||||
while ((record = recordReceiver.getFromReader()) != null) {
|
||||
if (record.getColumnNumber() != options.getColumns().size()) {
|
||||
throw DataXException
|
||||
.asDataXException(
|
||||
DBUtilErrorCode.CONF_ERROR,
|
||||
String.format(
|
||||
"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
|
||||
record.getColumnNumber(),
|
||||
options.getColumns().size()));
|
||||
}
|
||||
writerManager.writeRecord(rowSerializer.serialize(record));
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null);
|
||||
} finally {
|
||||
DBUtil.closeDBResources(null, stmt, null);
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void preCheckPrePareSQL(final Key keys) {
|
||||
final String table = keys.getTable();
|
||||
final List<String> preSqls = keys.getPreSqlList();
|
||||
final List<String> renderedPreSqls = renderPreOrPostSqls(preSqls, table);
|
||||
if (!renderedPreSqls.isEmpty()) {
|
||||
LOG.info("prepare check preSqls:[{}].", String.join(";", renderedPreSqls));
|
||||
for (final String sql : renderedPreSqls) {
|
||||
@Override
|
||||
public void post() {
|
||||
try {
|
||||
DBUtil.sqlValid(sql, DataBaseType.MySql);
|
||||
} catch (ParserException e) {
|
||||
throw RdbmsException.asPreSQLParserException(DataBaseType.MySql, e, sql);
|
||||
}
|
||||
}
|
||||
writerManager.close();
|
||||
} catch (Exception e) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
|
||||
}
|
||||
}
|
||||
|
||||
private void preCheckPostSQL(final Key keys) {
|
||||
final String table = keys.getTable();
|
||||
final List<String> postSqls = keys.getPostSqlList();
|
||||
final List<String> renderedPostSqls = renderPreOrPostSqls(postSqls, table);
|
||||
if (!renderedPostSqls.isEmpty()) {
|
||||
LOG.info("prepare check postSqls:[{}].", String.join(";", renderedPostSqls));
|
||||
for (final String sql : renderedPostSqls) {
|
||||
try {
|
||||
DBUtil.sqlValid(sql, DataBaseType.MySql);
|
||||
} catch (ParserException e) {
|
||||
throw RdbmsException.asPostSQLParserException(DataBaseType.MySql, e, sql);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@Override
|
||||
public void destroy() {}
|
||||
|
||||
@Override
|
||||
public boolean supportFailOver(){
|
||||
return false;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -1,243 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
||||
import com.alibaba.fastjson.JSON;
|
||||
import com.google.common.collect.Lists;
|
||||
import com.google.common.collect.Maps;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.http.HttpEntity;
|
||||
import org.apache.http.HttpHeaders;
|
||||
import org.apache.http.HttpRequest;
|
||||
import org.apache.http.HttpResponse;
|
||||
import org.apache.http.HttpStatus;
|
||||
import org.apache.http.ProtocolException;
|
||||
import org.apache.http.client.config.RequestConfig;
|
||||
import org.apache.http.client.methods.CloseableHttpResponse;
|
||||
import org.apache.http.client.methods.HttpGet;
|
||||
import org.apache.http.client.methods.HttpHead;
|
||||
import org.apache.http.client.methods.HttpPut;
|
||||
import org.apache.http.client.methods.HttpUriRequest;
|
||||
import org.apache.http.client.methods.RequestBuilder;
|
||||
import org.apache.http.entity.ByteArrayEntity;
|
||||
import org.apache.http.impl.client.CloseableHttpClient;
|
||||
import org.apache.http.impl.client.DefaultRedirectStrategy;
|
||||
import org.apache.http.impl.client.HttpClientBuilder;
|
||||
import org.apache.http.impl.client.HttpClients;
|
||||
import org.apache.http.protocol.HttpContext;
|
||||
import org.apache.http.util.EntityUtils;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.URI;
|
||||
import java.util.Base64;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
import java.net.URLEncoder;
|
||||
|
||||
// Used to load batch of rows to Doris using stream load
|
||||
public class DorisWriterEmitter {
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterEmitter.class);
|
||||
private final Key keys;
|
||||
private int hostPos = 0;
|
||||
private List<String> targetHosts = Lists.newArrayList();
|
||||
|
||||
private RequestConfig requestConfig;
|
||||
|
||||
public DorisWriterEmitter(final Key keys) {
|
||||
this.keys = keys;
|
||||
initHostList();
|
||||
initRequestConfig();
|
||||
}
|
||||
|
||||
private void initRequestConfig() {
|
||||
requestConfig = RequestConfig.custom().setConnectTimeout(this.keys.getConnectTimeout()).build();
|
||||
}
|
||||
|
||||
// get target host from config
|
||||
private void initHostList() {
|
||||
List<String> hosts = this.keys.getBeLoadUrlList();
|
||||
if (hosts == null || hosts.isEmpty()) {
|
||||
hosts = this.keys.getFeLoadUrlList();
|
||||
}
|
||||
if (hosts == null || hosts.isEmpty()) {
|
||||
DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
|
||||
"Either beLoadUrl or feLoadUrl must be set");
|
||||
}
|
||||
for (String beHost : hosts) {
|
||||
targetHosts.add("http://" + beHost);
|
||||
}
|
||||
}
|
||||
|
||||
public void emit(final DorisFlushBatch flushData) {
|
||||
String host = this.getAvailableHost();
|
||||
for (int i = 0; i <= this.keys.getMaxRetries(); i++) {
|
||||
try {
|
||||
doStreamLoad(flushData, host);
|
||||
return;
|
||||
} catch (DataXException ex) {
|
||||
if (i >= this.keys.getMaxRetries()) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, ex);
|
||||
}
|
||||
LOG.error("StreamLoad error, switch host {} and retry: ", host, ex);
|
||||
host = this.getAvailableHost();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* execute doris stream load
|
||||
*/
|
||||
private void doStreamLoad(final DorisFlushBatch flushData, String host) {
|
||||
long start = System.currentTimeMillis();
|
||||
if (StringUtils.isEmpty(host)) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "None of the load url can be connected.");
|
||||
}
|
||||
String loadUrl = host + "/api/" + this.keys.getDatabase() + "/" + this.keys.getTable() + "/_stream_load";
|
||||
// do http put request and get response
|
||||
final Map<String, Object> loadResult;
|
||||
try {
|
||||
loadResult = this.doHttpPut(loadUrl, flushData);
|
||||
} catch (IOException e) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
|
||||
}
|
||||
|
||||
long cost = System.currentTimeMillis() - start;
|
||||
LOG.info("StreamLoad response: " + JSON.toJSONString(loadResult) + ", cost(ms): " + cost);
|
||||
final String keyStatus = "Status";
|
||||
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "Unable to flush data to doris: unknown result status.");
|
||||
}
|
||||
if (loadResult.get(keyStatus).equals("Fail")) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "Failed to flush data to doris.\n" + JSON.toJSONString(loadResult));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* loop to get target host
|
||||
*
|
||||
* @return
|
||||
*/
|
||||
private String getAvailableHost() {
|
||||
if (this.hostPos >= targetHosts.size()) {
|
||||
this.hostPos = 0;
|
||||
}
|
||||
|
||||
while (this.hostPos < targetHosts.size()) {
|
||||
final String host = targetHosts.get(hostPos);
|
||||
++this.hostPos;
|
||||
return host;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private Map<String, Object> doHttpPut(final String loadUrl, final DorisFlushBatch flushBatch) throws IOException {
|
||||
LOG.info(String.format("Executing stream load to: '%s', size: %s, rows: %d",
|
||||
loadUrl, flushBatch.getSize(), flushBatch.getRows()));
|
||||
|
||||
final HttpClientBuilder httpClientBuilder = HttpClients.custom().setRedirectStrategy(new DefaultRedirectStrategy() {
|
||||
@Override
|
||||
protected boolean isRedirectable(final String method) {
|
||||
return true;
|
||||
}
|
||||
|
||||
@Override
|
||||
public HttpUriRequest getRedirect(HttpRequest request, HttpResponse response, HttpContext context) throws ProtocolException {
|
||||
URI uri = this.getLocationURI(request, response, context);
|
||||
String method = request.getRequestLine().getMethod();
|
||||
if (method.equalsIgnoreCase("HEAD")) {
|
||||
return new HttpHead(uri);
|
||||
} else if (method.equalsIgnoreCase("GET")) {
|
||||
return new HttpGet(uri);
|
||||
} else {
|
||||
int status = response.getStatusLine().getStatusCode();
|
||||
return (HttpUriRequest) (status == 307 ? RequestBuilder.copy(request).setUri(uri).build() : new HttpGet(uri));
|
||||
}
|
||||
}
|
||||
});
|
||||
|
||||
try (final CloseableHttpClient httpclient = httpClientBuilder.build()) {
|
||||
final HttpPut httpPut = new HttpPut(loadUrl);
|
||||
final List<String> cols = this.keys.getColumns();
|
||||
if (null != cols && !cols.isEmpty()) {
|
||||
httpPut.setHeader("columns", String.join(",", cols.stream().map(item -> String.format("`%s`", item.trim().replace("`", ""))).collect(Collectors.toList())));
|
||||
}
|
||||
|
||||
//set default header
|
||||
setDefaultHeader(httpPut);
|
||||
// put custom loadProps to http header
|
||||
final Map<String, Object> loadProps = this.keys.getLoadProps();
|
||||
if (null != loadProps) {
|
||||
for (final Map.Entry<String, Object> entry : loadProps.entrySet()) {
|
||||
httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue()));
|
||||
}
|
||||
}
|
||||
|
||||
// set other required headers
|
||||
httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
|
||||
httpPut.setHeader(HttpHeaders.AUTHORIZATION, this.getBasicAuthHeader(this.keys.getUsername(), this.keys.getPassword()));
|
||||
httpPut.setHeader("label", flushBatch.getLabel());
|
||||
|
||||
// Use ByteArrayEntity instead of StringEntity to handle Chinese correctly
|
||||
httpPut.setEntity(new ByteArrayEntity(flushBatch.getData().getBytes()));
|
||||
httpPut.setConfig(requestConfig);
|
||||
|
||||
try (final CloseableHttpResponse resp = httpclient.execute(httpPut)) {
|
||||
final int code = resp.getStatusLine().getStatusCode();
|
||||
if (HttpStatus.SC_OK != code) {
|
||||
LOG.warn("Request failed with code:{}", code);
|
||||
return null;
|
||||
}
|
||||
final HttpEntity respEntity = resp.getEntity();
|
||||
if (null == respEntity) {
|
||||
LOG.warn("Request failed with empty response.");
|
||||
return null;
|
||||
}
|
||||
return (Map<String, Object>) JSON.parse(EntityUtils.toString(respEntity));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set default request headers in json and csv formats.
|
||||
* csv default delimiters are \x01 and \x02
|
||||
*/
|
||||
private void setDefaultHeader(HttpPut httpPut) {
|
||||
if (Key.DEFAULT_FORMAT_CSV.equalsIgnoreCase(this.keys.getFormat())) {
|
||||
httpPut.setHeader("line_delimiter", this.keys.getLineDelimiter());
|
||||
httpPut.setHeader("column_separator", this.keys.getColumnSeparator());
|
||||
} else {
|
||||
httpPut.setHeader("format", "json");
|
||||
httpPut.setHeader("strip_outer_array", "true");
|
||||
httpPut.setHeader("fuzzy_parse", "true");
|
||||
}
|
||||
}
|
||||
|
||||
private String getBasicAuthHeader(final String username, final String password) {
|
||||
final String auth = username + ":" + password;
|
||||
final byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes());
|
||||
return "Basic " + new String(encodedAuth);
|
||||
}
|
||||
|
||||
}
|
@ -0,0 +1,192 @@
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.google.common.base.Strings;
|
||||
import org.apache.commons.lang3.concurrent.BasicThreadFactory;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.nio.charset.StandardCharsets;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.LinkedBlockingDeque;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
import java.util.concurrent.ScheduledFuture;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
public class DorisWriterManager {
|
||||
|
||||
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);
|
||||
|
||||
private final DorisStreamLoadVisitor visitor;
|
||||
private final DorisWriterOptions options;
|
||||
private final List<byte[]> buffer = new ArrayList<> ();
|
||||
private int batchCount = 0;
|
||||
private long batchSize = 0;
|
||||
private volatile boolean closed = false;
|
||||
private volatile Exception flushException;
|
||||
private final LinkedBlockingDeque<DorisWriterTuple> flushQueue;
|
||||
private ScheduledExecutorService scheduler;
|
||||
private ScheduledFuture<?> scheduledFuture;
|
||||
|
||||
public DorisWriterManager(DorisWriterOptions options) {
|
||||
this.options = options;
|
||||
this.visitor = new DorisStreamLoadVisitor(options);
|
||||
flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
|
||||
this.startScheduler();
|
||||
this.startAsyncFlushing();
|
||||
}
|
||||
|
||||
public void startScheduler() {
|
||||
stopScheduler();
|
||||
this.scheduler = Executors.newScheduledThreadPool(1, new BasicThreadFactory.Builder().namingPattern("Doris-interval-flush").daemon(true).build());
|
||||
this.scheduledFuture = this.scheduler.schedule(() -> {
|
||||
synchronized (DorisWriterManager.this) {
|
||||
if (!closed) {
|
||||
try {
|
||||
String label = createBatchLabel();
|
||||
LOG.info(String.format("Doris interval Sinking triggered: label[%s].", label));
|
||||
if (batchCount == 0) {
|
||||
startScheduler();
|
||||
}
|
||||
flush(label, false);
|
||||
} catch (Exception e) {
|
||||
flushException = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
}, options.getFlushInterval(), TimeUnit.MILLISECONDS);
|
||||
}
|
||||
|
||||
public void stopScheduler() {
|
||||
if (this.scheduledFuture != null) {
|
||||
scheduledFuture.cancel(false);
|
||||
this.scheduler.shutdown();
|
||||
}
|
||||
}
|
||||
|
||||
public final synchronized void writeRecord(String record) throws IOException {
|
||||
checkFlushException();
|
||||
try {
|
||||
byte[] bts = record.getBytes(StandardCharsets.UTF_8);
|
||||
buffer.add(bts);
|
||||
batchCount++;
|
||||
batchSize += bts.length;
|
||||
if (batchCount >= options.getBatchRows() || batchSize >= options.getBatchSize()) {
|
||||
String label = createBatchLabel();
|
||||
LOG.debug(String.format("Doris buffer Sinking triggered: rows[%d] label[%s].", batchCount, label));
|
||||
flush(label, false);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
throw new IOException("Writing records to Doris failed.", e);
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized void flush(String label, boolean waitUtilDone) throws Exception {
|
||||
checkFlushException();
|
||||
if (batchCount == 0) {
|
||||
if (waitUtilDone) {
|
||||
waitAsyncFlushingDone();
|
||||
}
|
||||
return;
|
||||
}
|
||||
flushQueue.put(new DorisWriterTuple(label, batchSize, new ArrayList<>(buffer)));
|
||||
if (waitUtilDone) {
|
||||
// wait the last flush
|
||||
waitAsyncFlushingDone();
|
||||
}
|
||||
buffer.clear();
|
||||
batchCount = 0;
|
||||
batchSize = 0;
|
||||
}
|
||||
|
||||
public synchronized void close() {
|
||||
if (!closed) {
|
||||
closed = true;
|
||||
try {
|
||||
String label = createBatchLabel();
|
||||
if (batchCount > 0) LOG.debug(String.format("Doris Sink is about to close: label[%s].", label));
|
||||
flush(label, true);
|
||||
} catch (Exception e) {
|
||||
throw new RuntimeException("Writing records to Doris failed.", e);
|
||||
}
|
||||
}
|
||||
checkFlushException();
|
||||
}
|
||||
|
||||
public String createBatchLabel() {
|
||||
StringBuilder sb = new StringBuilder();
|
||||
if (! Strings.isNullOrEmpty(options.getLabelPrefix())) {
|
||||
sb.append(options.getLabelPrefix());
|
||||
}
|
||||
return sb.append(UUID.randomUUID().toString())
|
||||
.toString();
|
||||
}
|
||||
|
||||
private void startAsyncFlushing() {
|
||||
// start flush thread
|
||||
Thread flushThread = new Thread(new Runnable(){
|
||||
public void run() {
|
||||
while(true) {
|
||||
try {
|
||||
asyncFlush();
|
||||
} catch (Exception e) {
|
||||
flushException = e;
|
||||
}
|
||||
}
|
||||
}
|
||||
});
|
||||
flushThread.setDaemon(true);
|
||||
flushThread.start();
|
||||
}
|
||||
|
||||
private void waitAsyncFlushingDone() throws InterruptedException {
|
||||
// wait previous flushings
|
||||
for (int i = 0; i <= options.getFlushQueueLength(); i++) {
|
||||
flushQueue.put(new DorisWriterTuple("", 0l, null));
|
||||
}
|
||||
checkFlushException();
|
||||
}
|
||||
|
||||
private void asyncFlush() throws Exception {
|
||||
DorisWriterTuple flushData = flushQueue.take();
|
||||
if (Strings.isNullOrEmpty(flushData.getLabel())) {
|
||||
return;
|
||||
}
|
||||
stopScheduler();
|
||||
LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel()));
|
||||
for (int i = 0; i <= options.getMaxRetries(); i++) {
|
||||
try {
|
||||
// flush to Doris with stream load
|
||||
visitor.streamLoad(flushData);
|
||||
LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel()));
|
||||
startScheduler();
|
||||
break;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to flush batch data to Doris, retry times = {}", i, e);
|
||||
if (i >= options.getMaxRetries()) {
|
||||
throw new IOException(e);
|
||||
}
|
||||
if (e instanceof DorisStreamLoadExcetion && ((DorisStreamLoadExcetion)e).needReCreateLabel()) {
|
||||
String newLabel = createBatchLabel();
|
||||
LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));
|
||||
flushData.setLabel(newLabel);
|
||||
}
|
||||
try {
|
||||
Thread.sleep(1000l * Math.min(i + 1, 10));
|
||||
} catch (InterruptedException ex) {
|
||||
Thread.currentThread().interrupt();
|
||||
throw new IOException("Unable to flush, interrupted while doing another attempt", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void checkFlushException() {
|
||||
if (flushException != null) {
|
||||
throw new RuntimeException("Writing records to Doris failed.", flushException);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,174 @@
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class DorisWriterOptions implements Serializable {
|
||||
|
||||
private static final long serialVersionUID = 1l;
|
||||
private static final long KILO_BYTES_SCALE = 1024l;
|
||||
private static final long MEGA_BYTES_SCALE = KILO_BYTES_SCALE * KILO_BYTES_SCALE;
|
||||
private static final int MAX_RETRIES = 3;
|
||||
private static final int BATCH_ROWS = 500000;
|
||||
private static final long BATCH_BYTES = 90 * MEGA_BYTES_SCALE;
|
||||
private static final long FLUSH_INTERVAL = 300000;
|
||||
|
||||
private static final String KEY_LOAD_PROPS_FORMAT = "format";
|
||||
public enum StreamLoadFormat {
|
||||
CSV, JSON;
|
||||
}
|
||||
|
||||
private static final String KEY_USERNAME = "username";
|
||||
private static final String KEY_PASSWORD = "password";
|
||||
private static final String KEY_DATABASE = "connection[0].database";
|
||||
private static final String KEY_TABLE = "connection[0].table[0]";
|
||||
private static final String KEY_COLUMN = "column";
|
||||
private static final String KEY_PRE_SQL = "preSql";
|
||||
private static final String KEY_POST_SQL = "postSql";
|
||||
private static final String KEY_JDBC_URL = "connection[0].jdbcUrl";
|
||||
private static final String KEY_LABEL_PREFIX = "labelPrefix";
|
||||
private static final String KEY_MAX_BATCH_ROWS = "maxBatchRows";
|
||||
private static final String KEY_MAX_BATCH_SIZE = "maxBatchSize";
|
||||
private static final String KEY_FLUSH_INTERVAL = "flushInterval";
|
||||
private static final String KEY_LOAD_URL = "loadUrl";
|
||||
private static final String KEY_FLUSH_QUEUE_LENGTH = "flushQueueLength";
|
||||
private static final String KEY_LOAD_PROPS = "loadProps";
|
||||
|
||||
private final Configuration options;
|
||||
private List<String> infoCchemaColumns;
|
||||
private List<String> userSetColumns;
|
||||
private boolean isWildcardColumn;
|
||||
|
||||
public DorisWriterOptions(Configuration options) {
|
||||
this.options = options;
|
||||
this.userSetColumns = options.getList(KEY_COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList());
|
||||
if (1 == options.getList(KEY_COLUMN, String.class).size() && "*".trim().equals(options.getList(KEY_COLUMN, String.class).get(0))) {
|
||||
this.isWildcardColumn = true;
|
||||
}
|
||||
}
|
||||
|
||||
public void doPretreatment() {
|
||||
validateRequired();
|
||||
validateStreamLoadUrl();
|
||||
}
|
||||
|
||||
public String getJdbcUrl() {
|
||||
return options.getString(KEY_JDBC_URL);
|
||||
}
|
||||
|
||||
public String getDatabase() {
|
||||
return options.getString(KEY_DATABASE);
|
||||
}
|
||||
|
||||
public String getTable() {
|
||||
return options.getString(KEY_TABLE);
|
||||
}
|
||||
|
||||
public String getUsername() {
|
||||
return options.getString(KEY_USERNAME);
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return options.getString(KEY_PASSWORD);
|
||||
}
|
||||
|
||||
public String getLabelPrefix() {
|
||||
return options.getString(KEY_LABEL_PREFIX);
|
||||
}
|
||||
|
||||
public List<String> getLoadUrlList() {
|
||||
return options.getList(KEY_LOAD_URL, String.class);
|
||||
}
|
||||
|
||||
public List<String> getColumns() {
|
||||
if (isWildcardColumn) {
|
||||
return this.infoCchemaColumns;
|
||||
}
|
||||
return this.userSetColumns;
|
||||
}
|
||||
|
||||
public boolean isWildcardColumn() {
|
||||
return this.isWildcardColumn;
|
||||
}
|
||||
|
||||
public void setInfoCchemaColumns(List<String> cols) {
|
||||
this.infoCchemaColumns = cols;
|
||||
}
|
||||
|
||||
public List<String> getPreSqlList() {
|
||||
return options.getList(KEY_PRE_SQL, String.class);
|
||||
}
|
||||
|
||||
public List<String> getPostSqlList() {
|
||||
return options.getList(KEY_POST_SQL, String.class);
|
||||
}
|
||||
|
||||
public Map<String, Object> getLoadProps() {
|
||||
return options.getMap(KEY_LOAD_PROPS);
|
||||
}
|
||||
|
||||
public int getMaxRetries() {
|
||||
return MAX_RETRIES;
|
||||
}
|
||||
|
||||
public int getBatchRows() {
|
||||
Integer rows = options.getInt(KEY_MAX_BATCH_ROWS);
|
||||
return null == rows ? BATCH_ROWS : rows;
|
||||
}
|
||||
|
||||
public long getBatchSize() {
|
||||
Long size = options.getLong(KEY_MAX_BATCH_SIZE);
|
||||
return null == size ? BATCH_BYTES : size;
|
||||
}
|
||||
|
||||
public long getFlushInterval() {
|
||||
Long interval = options.getLong(KEY_FLUSH_INTERVAL);
|
||||
return null == interval ? FLUSH_INTERVAL : interval;
|
||||
}
|
||||
|
||||
public int getFlushQueueLength() {
|
||||
Integer len = options.getInt(KEY_FLUSH_QUEUE_LENGTH);
|
||||
return null == len ? 1 : len;
|
||||
}
|
||||
|
||||
public StreamLoadFormat getStreamLoadFormat() {
|
||||
Map<String, Object> loadProps = getLoadProps();
|
||||
if (null == loadProps) {
|
||||
return StreamLoadFormat.CSV;
|
||||
}
|
||||
if (loadProps.containsKey(KEY_LOAD_PROPS_FORMAT)
|
||||
&& StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(KEY_LOAD_PROPS_FORMAT)))) {
|
||||
return StreamLoadFormat.JSON;
|
||||
}
|
||||
return StreamLoadFormat.CSV;
|
||||
}
|
||||
|
||||
private void validateStreamLoadUrl() {
|
||||
List<String> urlList = getLoadUrlList();
|
||||
for (String host : urlList) {
|
||||
if (host.split(":").length < 2) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
|
||||
"loadUrl的格式不正确,请输入 `fe_ip:fe_http_ip;fe_ip:fe_http_ip`。");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void validateRequired() {
|
||||
final String[] requiredOptionKeys = new String[]{
|
||||
KEY_USERNAME,
|
||||
KEY_DATABASE,
|
||||
KEY_TABLE,
|
||||
KEY_COLUMN,
|
||||
KEY_LOAD_URL
|
||||
};
|
||||
for (String optionKey : requiredOptionKeys) {
|
||||
options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,20 @@
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class DorisWriterTuple {
|
||||
private String label;
|
||||
private Long bytes;
|
||||
private List<byte[]> rows;
|
||||
|
||||
public DorisWriterTuple(String label,Long bytes,List<byte[]> rows){
|
||||
this.label = label;
|
||||
this.rows = rows;
|
||||
this.bytes = bytes;
|
||||
}
|
||||
|
||||
public String getLabel() { return label; }
|
||||
public void setLabel(String label) { this.label = label; }
|
||||
public Long getBytes() { return bytes; }
|
||||
public List<byte[]> getRows() { return rows; }
|
||||
}
|
@ -1,42 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
/**
|
||||
* Handler for escape in properties.
|
||||
*/
|
||||
public class EscapeHandler {
|
||||
public static final String ESCAPE_DELIMITERS_FLAGS = "\\x";
|
||||
public static final Pattern ESCAPE_PATTERN = Pattern.compile("\\\\x([0-9|a-f|A-F]{2})");
|
||||
|
||||
public static String escapeString(String source) {
|
||||
if (source.startsWith(ESCAPE_DELIMITERS_FLAGS)) {
|
||||
Matcher m = ESCAPE_PATTERN.matcher(source);
|
||||
StringBuffer buf = new StringBuffer();
|
||||
while (m.find()) {
|
||||
m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1), 16)));
|
||||
}
|
||||
m.appendTail(buf);
|
||||
return buf.toString();
|
||||
}
|
||||
return source;
|
||||
}
|
||||
}
|
@ -1,188 +0,0 @@
|
||||
// Licensed to the Apache Software Foundation (ASF) under one
|
||||
// or more contributor license agreements. See the NOTICE file
|
||||
// distributed with this work for additional information
|
||||
// regarding copyright ownership. The ASF licenses this file
|
||||
// to you under the Apache License, Version 2.0 (the
|
||||
// "License"); you may not use this file except in compliance
|
||||
// with the License. You may obtain a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing,
|
||||
// software distributed under the License is distributed on an
|
||||
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||||
// KIND, either express or implied. See the License for the
|
||||
// specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||
|
||||
import com.alibaba.datax.common.exception.DataXException;
|
||||
import com.alibaba.datax.common.util.Configuration;
|
||||
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
||||
import com.google.common.base.Strings;
|
||||
|
||||
import java.io.Serializable;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
public class Key implements Serializable {
|
||||
public static final String FE_LOAD_URL = "loadUrl";
|
||||
public static final String BE_LOAD_URL = "beLoadUrl";
|
||||
public static final String JDBC_URL = "connection[0].jdbcUrl";
|
||||
|
||||
public static final String DATABASE = "connection[0].selectedDatabase";
|
||||
public static final String TABLE = "connection[0].table[0]";
|
||||
public static final String COLUMN = "column";
|
||||
|
||||
public static final String USERNAME = "username";
|
||||
public static final String PASSWORD = "password";
|
||||
|
||||
public static final String PRE_SQL = "preSql";
|
||||
public static final String POST_SQL = "postSql";
|
||||
|
||||
public static final String LOAD_PROPS = "loadProps";
|
||||
public static final String LOAD_PROPS_LINE_DELIMITER = "line_delimiter";
|
||||
public static final String LOAD_PROPS_COLUMN_SEPARATOR = "column_separator";
|
||||
|
||||
public static final String MAX_BATCH_ROWS = "maxBatchRows";
|
||||
public static final String BATCH_BYTE_SIZE = "maxBatchSize";
|
||||
public static final String MAX_RETRIES = "maxRetries";
|
||||
public static final String LABEL_PREFIX = "labelPrefix";
|
||||
public static final String FORMAT = "format";
|
||||
public static final String CONNECT_TIMEOUT = "connectTimeout";
|
||||
private final Configuration options;
|
||||
|
||||
private static final long DEFAULT_MAX_BATCH_ROWS = 500000;
|
||||
|
||||
private static final long DEFAULT_BATCH_BYTE_SIZE = 90 * 1024 * 1024;
|
||||
private static final int DEFAULT_MAX_RETRIES = 0;
|
||||
|
||||
private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_";
|
||||
private static final String DEFAULT_COLUMN_SEPARATOR = "\\x01";
|
||||
private static final String DEFAULT_LINE_DELIMITER = "\\x02";
|
||||
public static final String DEFAULT_FORMAT_CSV = "csv";
|
||||
private static final String DEFAULT_TIME_ZONE = "+08:00";
|
||||
private static final int DEFAULT_CONNECT_TIMEOUT = -1;
|
||||
|
||||
public Key(final Configuration options) {
|
||||
this.options = options;
|
||||
}
|
||||
|
||||
public void doPretreatment() {
|
||||
this.validateRequired();
|
||||
this.validateStreamLoadUrl();
|
||||
this.validateFormat();
|
||||
}
|
||||
|
||||
public String getJdbcUrl() {
|
||||
return this.options.getString(JDBC_URL);
|
||||
}
|
||||
|
||||
public String getDatabase() {
|
||||
return this.options.getString(DATABASE);
|
||||
}
|
||||
|
||||
public String getTable() {
|
||||
return this.options.getString(TABLE);
|
||||
}
|
||||
|
||||
public String getUsername() {
|
||||
return this.options.getString(USERNAME);
|
||||
}
|
||||
|
||||
public String getPassword() {
|
||||
return Strings.nullToEmpty(this.options.getString(PASSWORD));
|
||||
}
|
||||
|
||||
public List<String> getBeLoadUrlList() {
|
||||
return this.options.getList(BE_LOAD_URL, String.class);
|
||||
}
|
||||
|
||||
public List<String> getFeLoadUrlList() {
|
||||
return this.options.getList(FE_LOAD_URL, String.class);
|
||||
}
|
||||
|
||||
public List<String> getColumns() {
|
||||
return this.options.getList(COLUMN, String.class);
|
||||
}
|
||||
|
||||
public List<String> getPreSqlList() {
|
||||
return this.options.getList(PRE_SQL, String.class);
|
||||
}
|
||||
|
||||
public List<String> getPostSqlList() {
|
||||
return this.options.getList(POST_SQL, String.class);
|
||||
}
|
||||
|
||||
public Map<String, Object> getLoadProps() {
|
||||
return this.options.getMap(LOAD_PROPS, new HashMap<>());
|
||||
}
|
||||
|
||||
public long getBatchRows() {
|
||||
return this.options.getLong(MAX_BATCH_ROWS, DEFAULT_MAX_BATCH_ROWS);
|
||||
}
|
||||
|
||||
public long getBatchByteSize() {
|
||||
return this.options.getLong(BATCH_BYTE_SIZE, DEFAULT_BATCH_BYTE_SIZE);
|
||||
}
|
||||
|
||||
public int getMaxRetries() {
|
||||
return this.options.getInt(MAX_RETRIES, DEFAULT_MAX_RETRIES);
|
||||
}
|
||||
|
||||
public String getLabelPrefix() {
|
||||
return this.options.getString(LABEL_PREFIX, DEFAULT_LABEL_PREFIX);
|
||||
}
|
||||
|
||||
public String getLineDelimiter() {
|
||||
return getLoadProps().getOrDefault(LOAD_PROPS_LINE_DELIMITER, DEFAULT_LINE_DELIMITER).toString();
|
||||
}
|
||||
|
||||
public String getFormat() {
|
||||
return this.options.getString(FORMAT, DEFAULT_FORMAT_CSV);
|
||||
}
|
||||
|
||||
public String getColumnSeparator() {
|
||||
return getLoadProps().getOrDefault(LOAD_PROPS_COLUMN_SEPARATOR, DEFAULT_COLUMN_SEPARATOR).toString();
|
||||
}
|
||||
|
||||
public int getConnectTimeout() {
|
||||
return this.options.getInt(CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT);
|
||||
}
|
||||
|
||||
|
||||
private void validateStreamLoadUrl() {
|
||||
List<String> urlList = this.getBeLoadUrlList();
|
||||
if (urlList == null) {
|
||||
urlList = this.getFeLoadUrlList();
|
||||
}
|
||||
if (urlList == null || urlList.isEmpty()) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "Either beLoadUrl or feLoadUrl must be set");
|
||||
}
|
||||
|
||||
for (final String host : urlList) {
|
||||
if (host.split(":").length < 2) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
|
||||
"Invalid load url format. IF use FE hosts, should be like: fe_host:fe_http_port."
|
||||
+ " If use BE hosts, should be like: be_host:be_webserver_port");
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void validateFormat() {
|
||||
String format = this.getFormat();
|
||||
if (!Arrays.asList("csv", "json").contains(format.toLowerCase())) {
|
||||
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "format only supports csv or json");
|
||||
}
|
||||
}
|
||||
|
||||
private void validateRequired() {
|
||||
final String[] requiredOptionKeys = new String[]{JDBC_URL, USERNAME, DATABASE, TABLE, COLUMN};
|
||||
for (final String optionKey : requiredOptionKeys) {
|
||||
this.options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user