mirror of
https://github.com/alibaba/DataX.git
synced 2025-05-03 01:49:34 +08:00
Refactoring the doriswriter write plugin
Refactoring the doriswriter write plugin
This commit is contained in:
parent
8b46e82a60
commit
fc4c162cff
@ -43,6 +43,7 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
|
|||||||
"password": "xxxxxx",
|
"password": "xxxxxx",
|
||||||
"postSql": ["select count(1) from all_employees_info"],
|
"postSql": ["select count(1) from all_employees_info"],
|
||||||
"preSql": [],
|
"preSql": [],
|
||||||
|
"flushInterval":30000,
|
||||||
"connection": [
|
"connection": [
|
||||||
{
|
{
|
||||||
"jdbcUrl": "jdbc:mysql://172.16.0.13:9030/demo",
|
"jdbcUrl": "jdbc:mysql://172.16.0.13:9030/demo",
|
||||||
@ -77,7 +78,7 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
|
|||||||
|
|
||||||
* **loadUrl**
|
* **loadUrl**
|
||||||
|
|
||||||
- 描述:和 **beLoadUrl** 二选一。作为 Stream Load 的连接目标。格式为 "ip:port"。其中 IP 是 FE 节点 IP,port 是 FE 节点的 http_port。可以填写多个,doriswriter 将以轮询的方式访问。
|
- 描述:作为 Stream Load 的连接目标。格式为 "ip:port"。其中 IP 是 FE 节点 IP,port 是 FE 节点的 http_port。可以填写多个,多个之间使用英文状态的分号隔开:`;`,doriswriter 将以轮询的方式访问。
|
||||||
- 必选:是
|
- 必选:是
|
||||||
- 默认值:无
|
- 默认值:无
|
||||||
|
|
||||||
@ -93,14 +94,12 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
|
|||||||
- 必选:否
|
- 必选:否
|
||||||
- 默认值:空
|
- 默认值:空
|
||||||
|
|
||||||
* **database**
|
* **connection.selectedDatabase**
|
||||||
|
|
||||||
- 描述:需要写入的Doris数据库名称。
|
- 描述:需要写入的Doris数据库名称。
|
||||||
- 必选:是
|
- 必选:是
|
||||||
- 默认值:无
|
- 默认值:无
|
||||||
|
|
||||||
* **table**
|
* **connection.table**
|
||||||
|
|
||||||
- 描述:需要写入的Doris表名称。
|
- 描述:需要写入的Doris表名称。
|
||||||
- 必选:是
|
- 必选:是
|
||||||
- 默认值:无
|
- 默认值:无
|
||||||
@ -144,32 +143,26 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
|
|||||||
|
|
||||||
* **labelPrefix**
|
* **labelPrefix**
|
||||||
|
|
||||||
- 描述:每批次导入任务的 label 前缀。最终的 label 将有 `labelPrefix + UUID + 序号` 组成
|
- 描述:每批次导入任务的 label 前缀。最终的 label 将有 `labelPrefix + UUID` 组成全局唯一的 label,确保数据不会重复导入
|
||||||
- 必选:否
|
- 必选:否
|
||||||
- 默认值:`datax_doris_writer_`
|
- 默认值:`datax_doris_writer_`
|
||||||
|
|
||||||
* **format**
|
|
||||||
|
|
||||||
- 描述:StreamLoad数据的组装格式,支持csv和json格式。csv默认的行分隔符是\x01,列分隔符是\x02。
|
|
||||||
- 必选:否
|
|
||||||
- 默认值:csv
|
|
||||||
|
|
||||||
* **loadProps**
|
* **loadProps**
|
||||||
|
|
||||||
- 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。
|
- 描述:StreamLoad 的请求参数,详情参照StreamLoad介绍页面。[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual)
|
||||||
|
|
||||||
|
这里包括导入的数据格式:format等,导入数据格式默认我们使用csv,支持JSON,具体可以参照下面类型转换部分,也可以参照上面Stream load 官方信息
|
||||||
|
|
||||||
- 必选:否
|
- 必选:否
|
||||||
|
|
||||||
- 默认值:无
|
- 默认值:无
|
||||||
|
|
||||||
* **connectTimeout**
|
|
||||||
|
|
||||||
- 描述:StreamLoad单次请求的超时时间, 单位毫秒(ms)。
|
|
||||||
- 必选:否
|
|
||||||
- 默认值:-1
|
|
||||||
|
|
||||||
### 类型转换
|
### 类型转换
|
||||||
|
|
||||||
默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行StreamLoad导入操作。
|
默认传入的数据均会被转为字符串,并以`\t`作为列分隔符,`\n`作为行分隔符,组成`csv`文件进行StreamLoad导入操作。
|
||||||
如需更改列分隔符, 则正确配置 `loadProps` 即可:
|
|
||||||
|
默认是csv格式导入,如需更改列分隔符, 则正确配置 `loadProps` 即可:
|
||||||
|
|
||||||
```json
|
```json
|
||||||
"loadProps": {
|
"loadProps": {
|
||||||
"column_separator": "\\x01",
|
"column_separator": "\\x01",
|
||||||
@ -184,3 +177,5 @@ DorisWriter 通过Doris原生支持Stream load方式导入数据, DorisWriter
|
|||||||
"strip_outer_array": true
|
"strip_outer_array": true
|
||||||
}
|
}
|
||||||
```
|
```
|
||||||
|
|
||||||
|
更多信息请参照 Doris 官网:[Stream load - Apache Doris](https://doris.apache.org/zh-CN/docs/data-operate/import/import-way/stream-load-manual)
|
@ -4,7 +4,7 @@ import com.google.common.base.Strings;
|
|||||||
|
|
||||||
import java.io.StringWriter;
|
import java.io.StringWriter;
|
||||||
|
|
||||||
public class DorisDelimiterParser {
|
public class DelimiterParser {
|
||||||
|
|
||||||
private static final String HEX_STRING = "0123456789ABCDEF";
|
private static final String HEX_STRING = "0123456789ABCDEF";
|
||||||
|
|
@ -2,8 +2,8 @@ package com.alibaba.datax.plugin.writer.doriswriter;
|
|||||||
|
|
||||||
import com.alibaba.datax.common.element.Column;
|
import com.alibaba.datax.common.element.Column;
|
||||||
|
|
||||||
public class DorisBaseSerializer {
|
public class DorisBaseCodec {
|
||||||
protected String fieldConvertion( Column col) {
|
protected String convertionField( Column col) {
|
||||||
if (null == col.getRawData() || Column.Type.NULL == col.getType()) {
|
if (null == col.getRawData() || Column.Type.NULL == col.getType()) {
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
@ -4,7 +4,7 @@ import com.alibaba.datax.common.element.Record;
|
|||||||
|
|
||||||
import java.io.Serializable;
|
import java.io.Serializable;
|
||||||
|
|
||||||
public interface DorisSerializer extends Serializable {
|
public interface DorisCodec extends Serializable {
|
||||||
|
|
||||||
String serialize( Record row);
|
String codec( Record row);
|
||||||
}
|
}
|
@ -0,0 +1,19 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||||
|
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
public class DorisCodecFactory {
|
||||||
|
public DorisCodecFactory (){
|
||||||
|
|
||||||
|
}
|
||||||
|
public static DorisCodec createCodec( Keys writerOptions) {
|
||||||
|
if ( Keys.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
|
||||||
|
Map<String, Object> props = writerOptions.getLoadProps();
|
||||||
|
return new DorisCsvCodec (null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator")));
|
||||||
|
}
|
||||||
|
if ( Keys.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
|
||||||
|
return new DorisJsonCodec (writerOptions.getColumns());
|
||||||
|
}
|
||||||
|
throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties.");
|
||||||
|
}
|
||||||
|
}
|
@ -2,20 +2,21 @@ package com.alibaba.datax.plugin.writer.doriswriter;
|
|||||||
|
|
||||||
import com.alibaba.datax.common.element.Record;
|
import com.alibaba.datax.common.element.Record;
|
||||||
|
|
||||||
public class DorisCsvSerializer extends DorisBaseSerializer implements DorisSerializer{
|
public class DorisCsvCodec extends DorisBaseCodec implements DorisCodec {
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
private final String columnSeparator;
|
private final String columnSeparator;
|
||||||
|
|
||||||
public DorisCsvSerializer(String sp) {
|
public DorisCsvCodec ( String sp) {
|
||||||
this.columnSeparator = DorisDelimiterParser.parse(sp, "\t");
|
this.columnSeparator = DelimiterParser.parse(sp, "\t");
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String serialize( Record row) {
|
public String codec( Record row) {
|
||||||
StringBuilder sb = new StringBuilder();
|
StringBuilder sb = new StringBuilder();
|
||||||
for (int i = 0; i < row.getColumnNumber(); i++) {
|
for (int i = 0; i < row.getColumnNumber(); i++) {
|
||||||
String value = fieldConvertion(row.getColumn(i));
|
String value = convertionField(row.getColumn(i));
|
||||||
sb.append(null == value ? "\\N" : value);
|
sb.append(null == value ? "\\N" : value);
|
||||||
if (i < row.getColumnNumber() - 1) {
|
if (i < row.getColumnNumber() - 1) {
|
||||||
sb.append(columnSeparator);
|
sb.append(columnSeparator);
|
@ -7,25 +7,25 @@ import java.util.HashMap;
|
|||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class DorisJsonSerializer extends DorisBaseSerializer implements DorisSerializer{
|
public class DorisJsonCodec extends DorisBaseCodec implements DorisCodec {
|
||||||
|
|
||||||
private static final long serialVersionUID = 1L;
|
private static final long serialVersionUID = 1L;
|
||||||
|
|
||||||
private final List<String> fieldNames;
|
private final List<String> fieldNames;
|
||||||
|
|
||||||
public DorisJsonSerializer( List<String> fieldNames) {
|
public DorisJsonCodec ( List<String> fieldNames) {
|
||||||
this.fieldNames = fieldNames;
|
this.fieldNames = fieldNames;
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public String serialize( Record row) {
|
public String codec( Record row) {
|
||||||
if (null == fieldNames) {
|
if (null == fieldNames) {
|
||||||
return "";
|
return "";
|
||||||
}
|
}
|
||||||
Map<String, Object> rowMap = new HashMap<> (fieldNames.size());
|
Map<String, Object> rowMap = new HashMap<> (fieldNames.size());
|
||||||
int idx = 0;
|
int idx = 0;
|
||||||
for (String fieldName : fieldNames) {
|
for (String fieldName : fieldNames) {
|
||||||
rowMap.put(fieldName, fieldConvertion(row.getColumn(idx)));
|
rowMap.put(fieldName, convertionField(row.getColumn(idx)));
|
||||||
idx++;
|
idx++;
|
||||||
}
|
}
|
||||||
return JSON.toJSONString(rowMap);
|
return JSON.toJSONString(rowMap);
|
@ -1,19 +0,0 @@
|
|||||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
|
||||||
|
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
public class DorisSerializerFactory {
|
|
||||||
public DorisSerializerFactory(){
|
|
||||||
|
|
||||||
}
|
|
||||||
public static DorisSerializer createSerializer(DorisWriterOptions writerOptions) {
|
|
||||||
if (DorisWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) {
|
|
||||||
Map<String, Object> props = writerOptions.getLoadProps();
|
|
||||||
return new DorisCsvSerializer(null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator")));
|
|
||||||
}
|
|
||||||
if (DorisWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) {
|
|
||||||
return new DorisJsonSerializer(writerOptions.getColumns());
|
|
||||||
}
|
|
||||||
throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties.");
|
|
||||||
}
|
|
||||||
}
|
|
@ -27,10 +27,10 @@ import java.util.Map;
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
public class DorisStreamLoadVisitor {
|
public class DorisStreamLoadObserver {
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadVisitor.class);
|
private static final Logger LOG = LoggerFactory.getLogger(DorisStreamLoadObserver.class);
|
||||||
|
|
||||||
private DorisWriterOptions options;
|
private Keys options;
|
||||||
|
|
||||||
private long pos;
|
private long pos;
|
||||||
private static final String RESULT_FAILED = "Fail";
|
private static final String RESULT_FAILED = "Fail";
|
||||||
@ -42,14 +42,14 @@ public class DorisStreamLoadVisitor {
|
|||||||
private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
|
private static final String RESULT_LABEL_UNKNOWN = "UNKNOWN";
|
||||||
|
|
||||||
|
|
||||||
public DorisStreamLoadVisitor(DorisWriterOptions options){
|
public DorisStreamLoadObserver ( Keys options){
|
||||||
this.options = options;
|
this.options = options;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void streamLoad(DorisWriterTuple data) throws Exception {
|
public void streamLoad(WriterTuple data) throws Exception {
|
||||||
String host = getAvailableHost();
|
String host = getLoadHost();
|
||||||
if(host == null){
|
if(host == null){
|
||||||
throw new IOException ("None of the host in `load_url` could be connected.");
|
throw new IOException ("load_url cannot be empty, or the host cannot connect.Please check your configuration.");
|
||||||
}
|
}
|
||||||
String loadUrl = new StringBuilder(host)
|
String loadUrl = new StringBuilder(host)
|
||||||
.append("/api/")
|
.append("/api/")
|
||||||
@ -58,25 +58,25 @@ public class DorisStreamLoadVisitor {
|
|||||||
.append(options.getTable())
|
.append(options.getTable())
|
||||||
.append("/_stream_load")
|
.append("/_stream_load")
|
||||||
.toString();
|
.toString();
|
||||||
LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", data.getRows().size(), data.getBytes(), data.getLabel()));
|
LOG.info("Start to join batch data: rows[{}] bytes[{}] label[{}].", data.getRows().size(), data.getBytes(), data.getLabel());
|
||||||
Map<String, Object> loadResult = doHttpPut(loadUrl, data.getLabel(), joinRows(data.getRows(), data.getBytes().intValue()));
|
Map<String, Object> loadResult = put(loadUrl, data.getLabel(), addRows(data.getRows(), data.getBytes().intValue()));
|
||||||
|
LOG.info("StreamLoad response :{}",JSON.toJSONString(loadResult));
|
||||||
final String keyStatus = "Status";
|
final String keyStatus = "Status";
|
||||||
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
|
if (null == loadResult || !loadResult.containsKey(keyStatus)) {
|
||||||
throw new IOException("Unable to flush data to Doris: unknown result status.");
|
throw new IOException("Unable to flush data to Doris: unknown result status.");
|
||||||
}
|
}
|
||||||
LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString());
|
LOG.debug("StreamLoad response:{}",JSON.toJSONString(loadResult));
|
||||||
if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
|
if (RESULT_FAILED.equals(loadResult.get(keyStatus))) {
|
||||||
throw new IOException(
|
throw new IOException(
|
||||||
new StringBuilder("Failed to flush data to Doris.\n").append(JSON.toJSONString(loadResult)).toString()
|
new StringBuilder("Failed to flush data to Doris.\n").append(JSON.toJSONString(loadResult)).toString()
|
||||||
);
|
);
|
||||||
} else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
|
} else if (RESULT_LABEL_EXISTED.equals(loadResult.get(keyStatus))) {
|
||||||
LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString());
|
LOG.debug("StreamLoad response:{}",JSON.toJSONString(loadResult));
|
||||||
// has to block-checking the state to get the final result
|
checkStreamLoadState(host, data.getLabel());
|
||||||
checkLabelState(host, data.getLabel());
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkLabelState(String host, String label) throws IOException {
|
private void checkStreamLoadState(String host, String label) throws IOException {
|
||||||
int idx = 0;
|
int idx = 0;
|
||||||
while(true) {
|
while(true) {
|
||||||
try {
|
try {
|
||||||
@ -109,7 +109,7 @@ public class DorisStreamLoadVisitor {
|
|||||||
case RESULT_LABEL_PREPARE:
|
case RESULT_LABEL_PREPARE:
|
||||||
continue;
|
continue;
|
||||||
case RESULT_LABEL_ABORTED:
|
case RESULT_LABEL_ABORTED:
|
||||||
throw new DorisStreamLoadExcetion(String.format("Failed to flush data to Doris, Error " +
|
throw new DorisWriterExcetion (String.format("Failed to flush data to Doris, Error " +
|
||||||
"label[%s] state[%s]\n", label, labelState), null, true);
|
"label[%s] state[%s]\n", label, labelState), null, true);
|
||||||
case RESULT_LABEL_UNKNOWN:
|
case RESULT_LABEL_UNKNOWN:
|
||||||
default:
|
default:
|
||||||
@ -121,10 +121,10 @@ public class DorisStreamLoadVisitor {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private byte[] joinRows(List<byte[]> rows, int totalBytes) {
|
private byte[] addRows(List<byte[]> rows, int totalBytes) {
|
||||||
if (DorisWriterOptions.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
|
if (Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
|
||||||
Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps());
|
Map<String, Object> props = (options.getLoadProps() == null ? new HashMap<> () : options.getLoadProps());
|
||||||
byte[] lineDelimiter = DorisDelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
|
byte[] lineDelimiter = DelimiterParser.parse((String)props.get("row_delimiter"), "\n").getBytes(StandardCharsets.UTF_8);
|
||||||
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
|
ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length);
|
||||||
for (byte[] row : rows) {
|
for (byte[] row : rows) {
|
||||||
bos.put(row);
|
bos.put(row);
|
||||||
@ -133,7 +133,7 @@ public class DorisStreamLoadVisitor {
|
|||||||
return bos.array();
|
return bos.array();
|
||||||
}
|
}
|
||||||
|
|
||||||
if (DorisWriterOptions.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
|
if (Keys.StreamLoadFormat.JSON.equals(options.getStreamLoadFormat())) {
|
||||||
ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
|
ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1));
|
||||||
bos.put("[".getBytes(StandardCharsets.UTF_8));
|
bos.put("[".getBytes(StandardCharsets.UTF_8));
|
||||||
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
|
byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8);
|
||||||
@ -150,7 +150,7 @@ public class DorisStreamLoadVisitor {
|
|||||||
}
|
}
|
||||||
throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
|
throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:");
|
||||||
}
|
}
|
||||||
private Map<String, Object> doHttpPut(String loadUrl, String label, byte[] data) throws IOException {
|
private Map<String, Object> put(String loadUrl, String label, byte[] data) throws IOException {
|
||||||
LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
|
LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length));
|
||||||
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
|
final HttpClientBuilder httpClientBuilder = HttpClients.custom()
|
||||||
.setRedirectStrategy(new DefaultRedirectStrategy () {
|
.setRedirectStrategy(new DefaultRedirectStrategy () {
|
||||||
@ -162,7 +162,7 @@ public class DorisStreamLoadVisitor {
|
|||||||
try ( CloseableHttpClient httpclient = httpClientBuilder.build()) {
|
try ( CloseableHttpClient httpclient = httpClientBuilder.build()) {
|
||||||
HttpPut httpPut = new HttpPut(loadUrl);
|
HttpPut httpPut = new HttpPut(loadUrl);
|
||||||
List<String> cols = options.getColumns();
|
List<String> cols = options.getColumns();
|
||||||
if (null != cols && !cols.isEmpty() && DorisWriterOptions.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
|
if (null != cols && !cols.isEmpty() && Keys.StreamLoadFormat.CSV.equals(options.getStreamLoadFormat())) {
|
||||||
httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList())));
|
httpPut.setHeader("columns", String.join(",", cols.stream().map(f -> String.format("`%s`", f)).collect(Collectors.toList())));
|
||||||
}
|
}
|
||||||
if (null != options.getLoadProps()) {
|
if (null != options.getLoadProps()) {
|
||||||
@ -205,19 +205,19 @@ public class DorisStreamLoadVisitor {
|
|||||||
return respEntity;
|
return respEntity;
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getAvailableHost() {
|
private String getLoadHost() {
|
||||||
List<String> hostList = options.getLoadUrlList();
|
List<String> hostList = options.getLoadUrlList();
|
||||||
long tmp = pos + hostList.size();
|
long tmp = pos + hostList.size();
|
||||||
for (; pos < tmp; pos++) {
|
for (; pos < tmp; pos++) {
|
||||||
String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString();
|
String host = new StringBuilder("http://").append(hostList.get((int) (pos % hostList.size()))).toString();
|
||||||
if (tryHttpConnection(host)) {
|
if (checkConnection(host)) {
|
||||||
return host;
|
return host;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return null;
|
return null;
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean tryHttpConnection(String host) {
|
private boolean checkConnection(String host) {
|
||||||
try {
|
try {
|
||||||
URL url = new URL(host);
|
URL url = new URL(host);
|
||||||
HttpURLConnection co = (HttpURLConnection) url.openConnection();
|
HttpURLConnection co = (HttpURLConnection) url.openConnection();
|
||||||
@ -226,7 +226,7 @@ public class DorisStreamLoadVisitor {
|
|||||||
co.disconnect();
|
co.disconnect();
|
||||||
return true;
|
return true;
|
||||||
} catch (Exception e1) {
|
} catch (Exception e1) {
|
||||||
LOG.warn("Failed to connect to address:{}", host, e1);
|
e1.printStackTrace();
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -71,7 +71,7 @@ public class DorisUtil {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void preCheckPrePareSQL(DorisWriterOptions options) {
|
public static void preCheckPrePareSQL( Keys options) {
|
||||||
String table = options.getTable();
|
String table = options.getTable();
|
||||||
List<String> preSqls = options.getPreSqlList();
|
List<String> preSqls = options.getPreSqlList();
|
||||||
List<String> renderedPreSqls = DorisUtil.renderPreOrPostSqls(preSqls, table);
|
List<String> renderedPreSqls = DorisUtil.renderPreOrPostSqls(preSqls, table);
|
||||||
@ -87,7 +87,7 @@ public class DorisUtil {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public static void preCheckPostSQL(DorisWriterOptions options) {
|
public static void preCheckPostSQL( Keys options) {
|
||||||
String table = options.getTable();
|
String table = options.getTable();
|
||||||
List<String> postSqls = options.getPostSqlList();
|
List<String> postSqls = options.getPostSqlList();
|
||||||
List<String> renderedPostSqls = DorisUtil.renderPreOrPostSqls(postSqls, table);
|
List<String> renderedPostSqls = DorisUtil.renderPreOrPostSqls(postSqls, table);
|
||||||
|
@ -25,19 +25,12 @@ import com.alibaba.datax.common.util.Configuration;
|
|||||||
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
|
||||||
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.writer.Constant;
|
|
||||||
import com.alibaba.druid.sql.parser.ParserException;
|
|
||||||
import com.google.common.base.Strings;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.sql.Connection;
|
import java.sql.Connection;
|
||||||
import java.sql.Statement;
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.UUID;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* doris data writer
|
* doris data writer
|
||||||
@ -48,12 +41,12 @@ public class DorisWriter extends Writer {
|
|||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
|
||||||
private Configuration originalConfig = null;
|
private Configuration originalConfig = null;
|
||||||
private DorisWriterOptions options;
|
private Keys options;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init() {
|
public void init() {
|
||||||
this.originalConfig = super.getPluginJobConf();
|
this.originalConfig = super.getPluginJobConf();
|
||||||
options = new DorisWriterOptions(super.getPluginJobConf());
|
options = new Keys (super.getPluginJobConf());
|
||||||
options.doPretreatment();
|
options.doPretreatment();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -92,11 +85,10 @@ public class DorisWriter extends Writer {
|
|||||||
String username = options.getUsername();
|
String username = options.getUsername();
|
||||||
String password = options.getPassword();
|
String password = options.getPassword();
|
||||||
String jdbcUrl = options.getJdbcUrl();
|
String jdbcUrl = options.getJdbcUrl();
|
||||||
LOG.info("userName :{},password:{},jdbcUrl:{}.", username,password,jdbcUrl);
|
|
||||||
List<String> renderedPostSqls = DorisUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable());
|
List<String> renderedPostSqls = DorisUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable());
|
||||||
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
|
if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) {
|
||||||
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
|
Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password);
|
||||||
LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl);
|
LOG.info("Start to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl);
|
||||||
DorisUtil.executeSqls(conn, renderedPostSqls);
|
DorisUtil.executeSqls(conn, renderedPostSqls);
|
||||||
DBUtil.closeDBResources(null, null, conn);
|
DBUtil.closeDBResources(null, null, conn);
|
||||||
}
|
}
|
||||||
@ -110,19 +102,19 @@ public class DorisWriter extends Writer {
|
|||||||
|
|
||||||
public static class Task extends Writer.Task {
|
public static class Task extends Writer.Task {
|
||||||
private DorisWriterManager writerManager;
|
private DorisWriterManager writerManager;
|
||||||
private DorisWriterOptions options;
|
private Keys options;
|
||||||
private DorisSerializer rowSerializer;
|
private DorisCodec rowCodec;
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void init() {
|
public void init() {
|
||||||
options = new DorisWriterOptions(super.getPluginJobConf());
|
options = new Keys (super.getPluginJobConf());
|
||||||
if (options.isWildcardColumn()) {
|
if (options.isWildcardColumn()) {
|
||||||
Connection conn = DBUtil.getConnection(DataBaseType.MySql, options.getJdbcUrl(), options.getUsername(), options.getPassword());
|
Connection conn = DBUtil.getConnection(DataBaseType.MySql, options.getJdbcUrl(), options.getUsername(), options.getPassword());
|
||||||
List<String> columns = DorisUtil.getDorisTableColumns(conn, options.getDatabase(), options.getTable());
|
List<String> columns = DorisUtil.getDorisTableColumns(conn, options.getDatabase(), options.getTable());
|
||||||
options.setInfoCchemaColumns(columns);
|
options.setInfoCchemaColumns(columns);
|
||||||
}
|
}
|
||||||
writerManager = new DorisWriterManager(options);
|
writerManager = new DorisWriterManager(options);
|
||||||
rowSerializer = DorisSerializerFactory.createSerializer(options);
|
rowCodec = DorisCodecFactory.createCodec(options);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -138,11 +130,14 @@ public class DorisWriter extends Writer {
|
|||||||
.asDataXException(
|
.asDataXException(
|
||||||
DBUtilErrorCode.CONF_ERROR,
|
DBUtilErrorCode.CONF_ERROR,
|
||||||
String.format(
|
String.format(
|
||||||
"列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.",
|
"There is an error in the column configuration information. " +
|
||||||
|
"This is because you have configured a task where the number of fields to be read from the source:%s " +
|
||||||
|
"is not equal to the number of fields to be written to the destination table:%s. " +
|
||||||
|
"Please check your configuration and make changes.",
|
||||||
record.getColumnNumber(),
|
record.getColumnNumber(),
|
||||||
options.getColumns().size()));
|
options.getColumns().size()));
|
||||||
}
|
}
|
||||||
writerManager.writeRecord(rowSerializer.serialize(record));
|
writerManager.writeRecord(rowCodec.codec(record));
|
||||||
}
|
}
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
|
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e);
|
||||||
|
@ -3,17 +3,17 @@ package com.alibaba.datax.plugin.writer.doriswriter;
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
public class DorisStreamLoadExcetion extends IOException {
|
public class DorisWriterExcetion extends IOException {
|
||||||
|
|
||||||
private final Map<String, Object> response;
|
private final Map<String, Object> response;
|
||||||
private boolean reCreateLabel;
|
private boolean reCreateLabel;
|
||||||
|
|
||||||
public DorisStreamLoadExcetion(String message, Map<String, Object> response) {
|
public DorisWriterExcetion ( String message, Map<String, Object> response) {
|
||||||
super(message);
|
super(message);
|
||||||
this.response = response;
|
this.response = response;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DorisStreamLoadExcetion(String message, Map<String, Object> response, boolean reCreateLabel) {
|
public DorisWriterExcetion ( String message, Map<String, Object> response, boolean reCreateLabel) {
|
||||||
super(message);
|
super(message);
|
||||||
this.response = response;
|
this.response = response;
|
||||||
this.reCreateLabel = reCreateLabel;
|
this.reCreateLabel = reCreateLabel;
|
@ -20,20 +20,20 @@ public class DorisWriterManager {
|
|||||||
|
|
||||||
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);
|
private static final Logger LOG = LoggerFactory.getLogger(DorisWriterManager.class);
|
||||||
|
|
||||||
private final DorisStreamLoadVisitor visitor;
|
private final DorisStreamLoadObserver visitor;
|
||||||
private final DorisWriterOptions options;
|
private final Keys options;
|
||||||
private final List<byte[]> buffer = new ArrayList<> ();
|
private final List<byte[]> buffer = new ArrayList<> ();
|
||||||
private int batchCount = 0;
|
private int batchCount = 0;
|
||||||
private long batchSize = 0;
|
private long batchSize = 0;
|
||||||
private volatile boolean closed = false;
|
private volatile boolean closed = false;
|
||||||
private volatile Exception flushException;
|
private volatile Exception flushException;
|
||||||
private final LinkedBlockingDeque<DorisWriterTuple> flushQueue;
|
private final LinkedBlockingDeque< WriterTuple > flushQueue;
|
||||||
private ScheduledExecutorService scheduler;
|
private ScheduledExecutorService scheduler;
|
||||||
private ScheduledFuture<?> scheduledFuture;
|
private ScheduledFuture<?> scheduledFuture;
|
||||||
|
|
||||||
public DorisWriterManager(DorisWriterOptions options) {
|
public DorisWriterManager( Keys options) {
|
||||||
this.options = options;
|
this.options = options;
|
||||||
this.visitor = new DorisStreamLoadVisitor(options);
|
this.visitor = new DorisStreamLoadObserver (options);
|
||||||
flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
|
flushQueue = new LinkedBlockingDeque<>(options.getFlushQueueLength());
|
||||||
this.startScheduler();
|
this.startScheduler();
|
||||||
this.startAsyncFlushing();
|
this.startAsyncFlushing();
|
||||||
@ -92,7 +92,7 @@ public class DorisWriterManager {
|
|||||||
}
|
}
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
flushQueue.put(new DorisWriterTuple(label, batchSize, new ArrayList<>(buffer)));
|
flushQueue.put(new WriterTuple (label, batchSize, new ArrayList<>(buffer)));
|
||||||
if (waitUtilDone) {
|
if (waitUtilDone) {
|
||||||
// wait the last flush
|
// wait the last flush
|
||||||
waitAsyncFlushingDone();
|
waitAsyncFlushingDone();
|
||||||
@ -145,13 +145,13 @@ public class DorisWriterManager {
|
|||||||
private void waitAsyncFlushingDone() throws InterruptedException {
|
private void waitAsyncFlushingDone() throws InterruptedException {
|
||||||
// wait previous flushings
|
// wait previous flushings
|
||||||
for (int i = 0; i <= options.getFlushQueueLength(); i++) {
|
for (int i = 0; i <= options.getFlushQueueLength(); i++) {
|
||||||
flushQueue.put(new DorisWriterTuple("", 0l, null));
|
flushQueue.put(new WriterTuple ("", 0l, null));
|
||||||
}
|
}
|
||||||
checkFlushException();
|
checkFlushException();
|
||||||
}
|
}
|
||||||
|
|
||||||
private void asyncFlush() throws Exception {
|
private void asyncFlush() throws Exception {
|
||||||
DorisWriterTuple flushData = flushQueue.take();
|
WriterTuple flushData = flushQueue.take();
|
||||||
if (Strings.isNullOrEmpty(flushData.getLabel())) {
|
if (Strings.isNullOrEmpty(flushData.getLabel())) {
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
@ -169,7 +169,7 @@ public class DorisWriterManager {
|
|||||||
if (i >= options.getMaxRetries()) {
|
if (i >= options.getMaxRetries()) {
|
||||||
throw new IOException(e);
|
throw new IOException(e);
|
||||||
}
|
}
|
||||||
if (e instanceof DorisStreamLoadExcetion && ((DorisStreamLoadExcetion)e).needReCreateLabel()) {
|
if (e instanceof DorisWriterExcetion && (( DorisWriterExcetion )e).needReCreateLabel()) {
|
||||||
String newLabel = createBatchLabel();
|
String newLabel = createBatchLabel();
|
||||||
LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));
|
LOG.warn(String.format("Batch label changed from [%s] to [%s]", flushData.getLabel(), newLabel));
|
||||||
flushData.setLabel(newLabel);
|
flushData.setLabel(newLabel);
|
||||||
|
@ -1,174 +0,0 @@
|
|||||||
package com.alibaba.datax.plugin.writer.doriswriter;
|
|
||||||
|
|
||||||
import com.alibaba.datax.common.exception.DataXException;
|
|
||||||
import com.alibaba.datax.common.util.Configuration;
|
|
||||||
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
|
||||||
|
|
||||||
import java.io.Serializable;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
public class DorisWriterOptions implements Serializable {
|
|
||||||
|
|
||||||
private static final long serialVersionUID = 1l;
|
|
||||||
private static final long KILO_BYTES_SCALE = 1024l;
|
|
||||||
private static final long MEGA_BYTES_SCALE = KILO_BYTES_SCALE * KILO_BYTES_SCALE;
|
|
||||||
private static final int MAX_RETRIES = 3;
|
|
||||||
private static final int BATCH_ROWS = 500000;
|
|
||||||
private static final long BATCH_BYTES = 90 * MEGA_BYTES_SCALE;
|
|
||||||
private static final long FLUSH_INTERVAL = 300000;
|
|
||||||
|
|
||||||
private static final String KEY_LOAD_PROPS_FORMAT = "format";
|
|
||||||
public enum StreamLoadFormat {
|
|
||||||
CSV, JSON;
|
|
||||||
}
|
|
||||||
|
|
||||||
private static final String KEY_USERNAME = "username";
|
|
||||||
private static final String KEY_PASSWORD = "password";
|
|
||||||
private static final String KEY_DATABASE = "connection[0].database";
|
|
||||||
private static final String KEY_TABLE = "connection[0].table[0]";
|
|
||||||
private static final String KEY_COLUMN = "column";
|
|
||||||
private static final String KEY_PRE_SQL = "preSql";
|
|
||||||
private static final String KEY_POST_SQL = "postSql";
|
|
||||||
private static final String KEY_JDBC_URL = "connection[0].jdbcUrl";
|
|
||||||
private static final String KEY_LABEL_PREFIX = "labelPrefix";
|
|
||||||
private static final String KEY_MAX_BATCH_ROWS = "maxBatchRows";
|
|
||||||
private static final String KEY_MAX_BATCH_SIZE = "maxBatchSize";
|
|
||||||
private static final String KEY_FLUSH_INTERVAL = "flushInterval";
|
|
||||||
private static final String KEY_LOAD_URL = "loadUrl";
|
|
||||||
private static final String KEY_FLUSH_QUEUE_LENGTH = "flushQueueLength";
|
|
||||||
private static final String KEY_LOAD_PROPS = "loadProps";
|
|
||||||
|
|
||||||
private final Configuration options;
|
|
||||||
private List<String> infoCchemaColumns;
|
|
||||||
private List<String> userSetColumns;
|
|
||||||
private boolean isWildcardColumn;
|
|
||||||
|
|
||||||
public DorisWriterOptions(Configuration options) {
|
|
||||||
this.options = options;
|
|
||||||
this.userSetColumns = options.getList(KEY_COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList());
|
|
||||||
if (1 == options.getList(KEY_COLUMN, String.class).size() && "*".trim().equals(options.getList(KEY_COLUMN, String.class).get(0))) {
|
|
||||||
this.isWildcardColumn = true;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
public void doPretreatment() {
|
|
||||||
validateRequired();
|
|
||||||
validateStreamLoadUrl();
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getJdbcUrl() {
|
|
||||||
return options.getString(KEY_JDBC_URL);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getDatabase() {
|
|
||||||
return options.getString(KEY_DATABASE);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getTable() {
|
|
||||||
return options.getString(KEY_TABLE);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getUsername() {
|
|
||||||
return options.getString(KEY_USERNAME);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getPassword() {
|
|
||||||
return options.getString(KEY_PASSWORD);
|
|
||||||
}
|
|
||||||
|
|
||||||
public String getLabelPrefix() {
|
|
||||||
return options.getString(KEY_LABEL_PREFIX);
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<String> getLoadUrlList() {
|
|
||||||
return options.getList(KEY_LOAD_URL, String.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<String> getColumns() {
|
|
||||||
if (isWildcardColumn) {
|
|
||||||
return this.infoCchemaColumns;
|
|
||||||
}
|
|
||||||
return this.userSetColumns;
|
|
||||||
}
|
|
||||||
|
|
||||||
public boolean isWildcardColumn() {
|
|
||||||
return this.isWildcardColumn;
|
|
||||||
}
|
|
||||||
|
|
||||||
public void setInfoCchemaColumns(List<String> cols) {
|
|
||||||
this.infoCchemaColumns = cols;
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<String> getPreSqlList() {
|
|
||||||
return options.getList(KEY_PRE_SQL, String.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
public List<String> getPostSqlList() {
|
|
||||||
return options.getList(KEY_POST_SQL, String.class);
|
|
||||||
}
|
|
||||||
|
|
||||||
public Map<String, Object> getLoadProps() {
|
|
||||||
return options.getMap(KEY_LOAD_PROPS);
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getMaxRetries() {
|
|
||||||
return MAX_RETRIES;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getBatchRows() {
|
|
||||||
Integer rows = options.getInt(KEY_MAX_BATCH_ROWS);
|
|
||||||
return null == rows ? BATCH_ROWS : rows;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getBatchSize() {
|
|
||||||
Long size = options.getLong(KEY_MAX_BATCH_SIZE);
|
|
||||||
return null == size ? BATCH_BYTES : size;
|
|
||||||
}
|
|
||||||
|
|
||||||
public long getFlushInterval() {
|
|
||||||
Long interval = options.getLong(KEY_FLUSH_INTERVAL);
|
|
||||||
return null == interval ? FLUSH_INTERVAL : interval;
|
|
||||||
}
|
|
||||||
|
|
||||||
public int getFlushQueueLength() {
|
|
||||||
Integer len = options.getInt(KEY_FLUSH_QUEUE_LENGTH);
|
|
||||||
return null == len ? 1 : len;
|
|
||||||
}
|
|
||||||
|
|
||||||
public StreamLoadFormat getStreamLoadFormat() {
|
|
||||||
Map<String, Object> loadProps = getLoadProps();
|
|
||||||
if (null == loadProps) {
|
|
||||||
return StreamLoadFormat.CSV;
|
|
||||||
}
|
|
||||||
if (loadProps.containsKey(KEY_LOAD_PROPS_FORMAT)
|
|
||||||
&& StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(KEY_LOAD_PROPS_FORMAT)))) {
|
|
||||||
return StreamLoadFormat.JSON;
|
|
||||||
}
|
|
||||||
return StreamLoadFormat.CSV;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void validateStreamLoadUrl() {
|
|
||||||
List<String> urlList = getLoadUrlList();
|
|
||||||
for (String host : urlList) {
|
|
||||||
if (host.split(":").length < 2) {
|
|
||||||
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
|
|
||||||
"loadUrl的格式不正确,请输入 `fe_ip:fe_http_ip;fe_ip:fe_http_ip`。");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void validateRequired() {
|
|
||||||
final String[] requiredOptionKeys = new String[]{
|
|
||||||
KEY_USERNAME,
|
|
||||||
KEY_DATABASE,
|
|
||||||
KEY_TABLE,
|
|
||||||
KEY_COLUMN,
|
|
||||||
KEY_LOAD_URL
|
|
||||||
};
|
|
||||||
for (String optionKey : requiredOptionKeys) {
|
|
||||||
options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -0,0 +1,177 @@
|
|||||||
|
package com.alibaba.datax.plugin.writer.doriswriter;
|
||||||
|
|
||||||
|
import com.alibaba.datax.common.exception.DataXException;
|
||||||
|
import com.alibaba.datax.common.util.Configuration;
|
||||||
|
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
|
||||||
|
|
||||||
|
import java.io.Serializable;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
public class Keys implements Serializable {
|
||||||
|
|
||||||
|
private static final long serialVersionUID = 1l;
|
||||||
|
private static final int MAX_RETRIES = 3;
|
||||||
|
private static final int BATCH_ROWS = 500000;
|
||||||
|
private static final long DEFAULT_FLUSH_INTERVAL = 30000;
|
||||||
|
|
||||||
|
private static final String LOAD_PROPS_FORMAT = "format";
|
||||||
|
public enum StreamLoadFormat {
|
||||||
|
CSV, JSON;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final String USERNAME = "username";
|
||||||
|
private static final String PASSWORD = "password";
|
||||||
|
private static final String DATABASE = "connection[0].selectedDatabase";
|
||||||
|
private static final String TABLE = "connection[0].table[0]";
|
||||||
|
private static final String COLUMN = "column";
|
||||||
|
private static final String PRE_SQL = "preSql";
|
||||||
|
private static final String POST_SQL = "postSql";
|
||||||
|
private static final String JDBC_URL = "connection[0].jdbcUrl";
|
||||||
|
private static final String LABEL_PREFIX = "labelPrefix";
|
||||||
|
private static final String MAX_BATCH_ROWS = "maxBatchRows";
|
||||||
|
private static final String MAX_BATCH_SIZE = "maxBatchSize";
|
||||||
|
private static final String FLUSH_INTERVAL = "flushInterval";
|
||||||
|
private static final String LOAD_URL = "loadUrl";
|
||||||
|
private static final String FLUSH_QUEUE_LENGTH = "flushQueueLength";
|
||||||
|
private static final String LOAD_PROPS = "loadProps";
|
||||||
|
|
||||||
|
private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_";
|
||||||
|
|
||||||
|
private static final long DEFAULT_MAX_BATCH_SIZE = 90 * 1024 * 1024; //default 90M
|
||||||
|
|
||||||
|
private final Configuration options;
|
||||||
|
|
||||||
|
private List<String> infoSchemaColumns;
|
||||||
|
private List<String> userSetColumns;
|
||||||
|
private boolean isWildcardColumn;
|
||||||
|
|
||||||
|
public Keys ( Configuration options) {
|
||||||
|
this.options = options;
|
||||||
|
this.userSetColumns = options.getList(COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList());
|
||||||
|
if (1 == options.getList(COLUMN, String.class).size() && "*".trim().equals(options.getList(COLUMN, String.class).get(0))) {
|
||||||
|
this.isWildcardColumn = true;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void doPretreatment() {
|
||||||
|
validateRequired();
|
||||||
|
validateStreamLoadUrl();
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getJdbcUrl() {
|
||||||
|
return options.getString(JDBC_URL);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getDatabase() {
|
||||||
|
return options.getString(DATABASE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getTable() {
|
||||||
|
return options.getString(TABLE);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getUsername() {
|
||||||
|
return options.getString(USERNAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getPassword() {
|
||||||
|
return options.getString(PASSWORD);
|
||||||
|
}
|
||||||
|
|
||||||
|
public String getLabelPrefix() {
|
||||||
|
String label = options.getString(LABEL_PREFIX);
|
||||||
|
return null == label ? DEFAULT_LABEL_PREFIX : label;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getLoadUrlList() {
|
||||||
|
return options.getList(LOAD_URL, String.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getColumns() {
|
||||||
|
if (isWildcardColumn) {
|
||||||
|
return this.infoSchemaColumns;
|
||||||
|
}
|
||||||
|
return this.userSetColumns;
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isWildcardColumn() {
|
||||||
|
return this.isWildcardColumn;
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setInfoCchemaColumns(List<String> cols) {
|
||||||
|
this.infoSchemaColumns = cols;
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getPreSqlList() {
|
||||||
|
return options.getList(PRE_SQL, String.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<String> getPostSqlList() {
|
||||||
|
return options.getList(POST_SQL, String.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Map<String, Object> getLoadProps() {
|
||||||
|
return options.getMap(LOAD_PROPS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getMaxRetries() {
|
||||||
|
return MAX_RETRIES;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getBatchRows() {
|
||||||
|
Integer rows = options.getInt(MAX_BATCH_ROWS);
|
||||||
|
return null == rows ? BATCH_ROWS : rows;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getBatchSize() {
|
||||||
|
Long size = options.getLong(MAX_BATCH_SIZE);
|
||||||
|
return null == size ? DEFAULT_MAX_BATCH_SIZE : size;
|
||||||
|
}
|
||||||
|
|
||||||
|
public long getFlushInterval() {
|
||||||
|
Long interval = options.getLong(FLUSH_INTERVAL);
|
||||||
|
return null == interval ? DEFAULT_FLUSH_INTERVAL : interval;
|
||||||
|
}
|
||||||
|
|
||||||
|
public int getFlushQueueLength() {
|
||||||
|
Integer len = options.getInt(FLUSH_QUEUE_LENGTH);
|
||||||
|
return null == len ? 1 : len;
|
||||||
|
}
|
||||||
|
|
||||||
|
public StreamLoadFormat getStreamLoadFormat() {
|
||||||
|
Map<String, Object> loadProps = getLoadProps();
|
||||||
|
if (null == loadProps) {
|
||||||
|
return StreamLoadFormat.CSV;
|
||||||
|
}
|
||||||
|
if (loadProps.containsKey(LOAD_PROPS_FORMAT)
|
||||||
|
&& StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(LOAD_PROPS_FORMAT)))) {
|
||||||
|
return StreamLoadFormat.JSON;
|
||||||
|
}
|
||||||
|
return StreamLoadFormat.CSV;
|
||||||
|
}
|
||||||
|
|
||||||
|
private void validateStreamLoadUrl() {
|
||||||
|
List<String> urlList = getLoadUrlList();
|
||||||
|
for (String host : urlList) {
|
||||||
|
if (host.split(":").length < 2) {
|
||||||
|
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR,
|
||||||
|
"The format of loadUrl is not correct, please enter:[`fe_ip:fe_http_ip;fe_ip:fe_http_ip`].");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void validateRequired() {
|
||||||
|
final String[] requiredOptionKeys = new String[]{
|
||||||
|
USERNAME,
|
||||||
|
DATABASE,
|
||||||
|
TABLE,
|
||||||
|
COLUMN,
|
||||||
|
LOAD_URL
|
||||||
|
};
|
||||||
|
for (String optionKey : requiredOptionKeys) {
|
||||||
|
options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -2,12 +2,12 @@ package com.alibaba.datax.plugin.writer.doriswriter;
|
|||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
public class DorisWriterTuple {
|
public class WriterTuple {
|
||||||
private String label;
|
private String label;
|
||||||
private Long bytes;
|
private Long bytes;
|
||||||
private List<byte[]> rows;
|
private List<byte[]> rows;
|
||||||
|
|
||||||
public DorisWriterTuple(String label,Long bytes,List<byte[]> rows){
|
public WriterTuple ( String label, Long bytes, List<byte[]> rows){
|
||||||
this.label = label;
|
this.label = label;
|
||||||
this.rows = rows;
|
this.rows = rows;
|
||||||
this.bytes = bytes;
|
this.bytes = bytes;
|
Loading…
Reference in New Issue
Block a user