support csv format import

support csv format import
This commit is contained in:
jiafeng.zhang 2022-09-29 09:25:25 +08:00
parent 79db76dd8f
commit 6c41fe832b
10 changed files with 245 additions and 106 deletions

View File

@ -1,5 +1,4 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
@ -18,12 +17,10 @@ KIND, either express or implied. See the License for the
specific language governing permissions and limitations
under the License.
-->
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<assembly xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id/>
<formats>
<format>dir</format>
</formats>
@ -45,7 +42,6 @@ under the License.
<outputDirectory>plugin/writer/doriswriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>

View File

@ -22,16 +22,17 @@ import com.alibaba.datax.common.element.DateColumn;
import com.alibaba.datax.common.element.Record;
import org.apache.commons.lang3.time.DateFormatUtils;
import java.time.ZoneId;
import java.util.List;
import java.util.TimeZone;
public abstract class DorisCodec {
protected static String timeZone = "GMT+8";
protected static TimeZone timeZoner = TimeZone.getTimeZone(timeZone);
protected final TimeZone timeZone;
protected final List<String> fieldNames;
public DorisCodec(final List<String> fieldNames) {
public DorisCodec(final List<String> fieldNames, final String timeZone) {
this.fieldNames = fieldNames;
this.timeZone = TimeZone.getTimeZone(ZoneId.of(timeZone));
}
public abstract String serialize(Record row);
@ -60,9 +61,9 @@ public abstract class DorisCodec {
final DateColumn.DateType dateType = ((DateColumn) col).getSubType();
switch (dateType) {
case DATE:
return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd", timeZoner);
return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd", timeZone);
case DATETIME:
return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss", timeZoner);
return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss", timeZone);
default:
return col.asString();
}

View File

@ -0,0 +1,49 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.alibaba.datax.plugin.writer.doriswriter;
import com.alibaba.datax.common.element.Record;
import java.util.ArrayList;
import java.util.List;
public class DorisCsvCodec extends DorisCodec {
private final String columnSeparator;
public DorisCsvCodec(final List<String> fieldNames, String columnSeparator, String timeZone) {
super(fieldNames, timeZone);
this.columnSeparator = EscapeHandler.escapeString(columnSeparator);
}
@Override
public String serialize(final Record row) {
if (null == this.fieldNames) {
return "";
}
List<String> list = new ArrayList<>();
for (int i = 0; i < this.fieldNames.size(); i++) {
Object value = this.convertColumn(row.getColumn(i));
list.add(value != null ? value.toString() : "\\N");
}
return String.join(columnSeparator, list);
}
}

View File

@ -17,15 +17,20 @@
package com.alibaba.datax.plugin.writer.doriswriter;
import java.util.ArrayList;
import java.util.List;
// A wrapper class to hold a batch of loaded rows
public class DorisFlushBatch {
private String lineDelimiter;
private final String format;
private final String lineDelimiter;
private String label;
private long rows = 0;
private StringBuilder data = new StringBuilder();
private long byteSize = 0;
private List<String> data = new ArrayList<>();
public DorisFlushBatch(String lineDelimiter) {
this.lineDelimiter = lineDelimiter;
public DorisFlushBatch(String lineDelimiter, String format) {
this.lineDelimiter = EscapeHandler.escapeString(lineDelimiter);
this.format = format;
}
public void setLabel(String label) {
@ -37,22 +42,25 @@ public class DorisFlushBatch {
}
public long getRows() {
return rows;
return data.size();
}
public void putData(String row) {
if (data.length() > 0) {
data.append(lineDelimiter);
}
data.append(row);
rows++;
data.add(row);
byteSize += row.getBytes().length;
}
public StringBuilder getData() {
return data;
public String getData() {
String result;
if (Key.DEFAULT_FORMAT_CSV.equalsIgnoreCase(format)) {
result = String.join(this.lineDelimiter, data);
} else {
result = data.toString();
}
return result;
}
public long getSize() {
return data.length();
return byteSize;
}
}

View File

@ -28,8 +28,8 @@ import java.util.Map;
public class DorisJsonCodec extends DorisCodec {
private Map<String, Object> rowMap;
public DorisJsonCodec(final List<String> fieldNames) {
super(fieldNames);
public DorisJsonCodec(final List<String> fieldNames, final String timeZone) {
super(fieldNames, timeZone);
this.rowMap = new HashMap<>(this.fieldNames.size());
}

View File

@ -48,6 +48,7 @@ public class DorisWriter extends Writer {
private Key keys;
private DorisCodec rowCodec;
private int batchNum = 0;
private String labelPrefix;
public Task() {
}
@ -55,7 +56,12 @@ public class DorisWriter extends Writer {
@Override
public void init() {
this.keys = new Key(super.getPluginJobConf());
this.rowCodec = new DorisJsonCodec(this.keys.getColumns());
if (Key.DEFAULT_FORMAT_CSV.equalsIgnoreCase(this.keys.getFormat())) {
this.rowCodec = new DorisCsvCodec(this.keys.getColumns(), this.keys.getColumnSeparator(), this.keys.getTimeZone());
} else {
this.rowCodec = new DorisJsonCodec(this.keys.getColumns(), this.keys.getTimeZone());
}
this.labelPrefix = this.keys.getLabelPrefix() + UUID.randomUUID();
this.dorisWriterEmitter = new DorisWriterEmitter(keys);
}
@ -66,7 +72,7 @@ public class DorisWriter extends Writer {
@Override
public void startWrite(RecordReceiver recordReceiver) {
String lineDelimiter = this.keys.getLineDelimiter();
DorisFlushBatch flushBatch = new DorisFlushBatch(lineDelimiter);
DorisFlushBatch flushBatch = new DorisFlushBatch(lineDelimiter, this.keys.getFormat());
long batchCount = 0;
long batchByteSize = 0L;
Record record;
@ -93,7 +99,7 @@ public class DorisWriter extends Writer {
// clear buffer
batchCount = 0;
batchByteSize = 0L;
flushBatch = new DorisFlushBatch(lineDelimiter);
flushBatch = new DorisFlushBatch(lineDelimiter, this.keys.getFormat());
}
} // end of while
@ -103,14 +109,12 @@ public class DorisWriter extends Writer {
}
private void flush(DorisFlushBatch flushBatch) {
final String label = getStreamLoadLabel();
flushBatch.setLabel(label);
dorisWriterEmitter.doStreamLoad(flushBatch);
flushBatch.setLabel(getStreamLoadLabel());
dorisWriterEmitter.emit(flushBatch);
}
private String getStreamLoadLabel() {
String labelPrefix = this.keys.getLabelPrefix();
return labelPrefix + UUID.randomUUID().toString() + "_" + (batchNum++);
return labelPrefix + "_" + (batchNum++);
}
@Override

View File

@ -23,6 +23,7 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.fastjson.JSON;
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.HttpEntity;
import org.apache.http.HttpHeaders;
import org.apache.http.HttpRequest;
@ -47,12 +48,11 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.net.HttpURLConnection;
import java.net.URI;
import java.net.URL;
import java.util.Base64;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
// Used to load batch of rows to Doris using stream load
public class DorisWriterEmitter {
@ -88,13 +88,28 @@ public class DorisWriterEmitter {
}
}
public void emit(final DorisFlushBatch flushData) {
String host = this.getAvailableHost();
for (int i = 0; i <= this.keys.getMaxRetries(); i++) {
try {
doStreamLoad(flushData, host);
return;
} catch (DataXException ex) {
if (i >= this.keys.getMaxRetries()) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, ex);
}
LOG.error("StreamLoad error, switch host {} and retry: ", host, ex);
host = this.getAvailableHost();
}
}
}
/**
* execute doris stream load
*/
public void doStreamLoad(final DorisFlushBatch flushData) {
private void doStreamLoad(final DorisFlushBatch flushData, String host) {
long start = System.currentTimeMillis();
final String host = this.getAvailableHost();
if (null == host) {
if (StringUtils.isEmpty(host)) {
throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "None of the load url can be connected.");
}
final String loadUrl = host + "/api/" + this.keys.getDatabase() + "/" + this.keys.getTable() + "/_stream_load";
@ -130,28 +145,12 @@ public class DorisWriterEmitter {
while (this.hostPos < targetHosts.size()) {
final String host = targetHosts.get(hostPos);
++this.hostPos;
if (this.tryHttpConnection(host)) {
return host;
}
return host;
}
return null;
}
private boolean tryHttpConnection(final String host) {
try {
final URL url = new URL(host);
final HttpURLConnection co = (HttpURLConnection) url.openConnection();
co.setConnectTimeout(1000);
co.connect();
co.disconnect();
return true;
} catch (Exception e) {
LOG.warn("Failed to connect to address:{} , Exception ={}", host, e);
return false;
}
}
private Map<String, Object> doHttpPut(final String loadUrl, final DorisFlushBatch flushBatch) throws IOException {
LOG.info(String.format("Executing stream load to: '%s', size: %s, rows: %d",
loadUrl, flushBatch.getSize(), flushBatch.getRows()));
@ -181,10 +180,12 @@ public class DorisWriterEmitter {
final HttpPut httpPut = new HttpPut(loadUrl);
final List<String> cols = this.keys.getColumns();
if (null != cols && !cols.isEmpty()) {
httpPut.setHeader("columns", String.join(",", cols));
httpPut.setHeader("columns", String.join(",", cols.stream().map(item -> String.format("`%s`", item.trim().replace("`", ""))).collect(Collectors.toList())));
}
// put loadProps to http header
//set default header
setDefaultHeader(httpPut);
// put custom loadProps to http header
final Map<String, Object> loadProps = this.keys.getLoadProps();
if (null != loadProps) {
for (final Map.Entry<String, Object> entry : loadProps.entrySet()) {
@ -196,14 +197,9 @@ public class DorisWriterEmitter {
httpPut.setHeader(HttpHeaders.EXPECT, "100-continue");
httpPut.setHeader(HttpHeaders.AUTHORIZATION, this.getBasicAuthHeader(this.keys.getUsername(), this.keys.getPassword()));
httpPut.setHeader("label", flushBatch.getLabel());
httpPut.setHeader("format", "json");
httpPut.setHeader("line_delimiter", this.keys.getLineDelimiterDesc());
httpPut.setHeader("read_json_by_line", "true");
httpPut.setHeader("fuzzy_parse", "true");
// Use ByteArrayEntity instead of StringEntity to handle Chinese correctly
httpPut.setEntity(new ByteArrayEntity(flushBatch.getData().toString().getBytes()));
httpPut.setEntity(new ByteArrayEntity(flushBatch.getData().getBytes()));
httpPut.setConfig(requestConfig);
try (final CloseableHttpResponse resp = httpclient.execute(httpPut)) {
@ -222,6 +218,21 @@ public class DorisWriterEmitter {
}
}
/**
* Set default request headers in json and csv formats.
* csv default delimiters are \x01 and \x02
*/
private void setDefaultHeader(HttpPut httpPut) {
if (Key.DEFAULT_FORMAT_CSV.equalsIgnoreCase(this.keys.getFormat())) {
httpPut.setHeader("line_delimiter", this.keys.getLineDelimiter());
httpPut.setHeader("column_separator", this.keys.getColumnSeparator());
} else {
httpPut.setHeader("format", "json");
httpPut.setHeader("strip_outer_array", "true");
httpPut.setHeader("fuzzy_parse", "true");
}
}
private String getBasicAuthHeader(final String username, final String password) {
final String auth = username + ":" + password;
final byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes());
@ -231,44 +242,51 @@ public class DorisWriterEmitter {
// for test
public static void main(String[] args) throws IOException {
String json = "{\n" +
" \"feLoadUrl\": [\"127.0.0.1:8030\"],\n" +
" \"column\": [\"k1\", \"k2\", \"k3\"],\n" +
" \"database\": \"db1\",\n" +
" \"beLoadUrl\": [\"127.0.0.1:8040\"],\n" +
" \"column\": [\"name\", \"age\", \"cdate\", \"cdatetime\"],\n" +
" \"database\": \"test\",\n" +
" \"jdbcUrl\": \"jdbc:mysql://127.0.0.1:9030/\",\n" +
" \"loadProps\": {\n" +
// " \"line_delimiter\": \"\\\\x03\",\n" +
// " \"column_separator\": \"\\\\x04\",\n" +
" },\n" +
" \"password\": \"12345\",\n" +
" \"format\": \"csv\",\n" +
" \"password\": \"\",\n" +
" \"postSql\": [],\n" +
" \"preSql\": [],\n" +
" \"table\": \"t1\",\n" +
" \"table\": \"test_datax\",\n" +
" \"maxRetries\": \"0\",\n" +
" \"username\": \"root\"\n" +
" }";
Configuration configuration = Configuration.from(json);
Key key = new Key(configuration);
DorisWriterEmitter emitter = new DorisWriterEmitter(key);
DorisFlushBatch flushBatch = new DorisFlushBatch("\n");
flushBatch.setLabel("test4");
DorisFlushBatch flushBatch = new DorisFlushBatch(key.getLineDelimiter(), key.getFormat());
Map<String, String> row1 = Maps.newHashMap();
row1.put("k1", "2021-02-02");
row1.put("k2", "2021-02-02 00:00:00");
row1.put("k3", "3");
String rowStr1 = JSON.toJSONString(row1);
System.out.println("rows1: " + rowStr1);
flushBatch.putData(rowStr1);
row1.put("cdate", "2021-02-02");
row1.put("cdatetime", "2021-02-02 00:00:00");
row1.put("name", "zhangsan");
row1.put("age", "18");
Map<String, String> row2 = Maps.newHashMap();
row2.put("k1", "2021-02-03");
row2.put("k2", "2021-02-03 00:00:00");
row2.put("k3", "4");
row2.put("cdate", "2022-02-02");
row2.put("cdatetime", "2022-02-02 10:00:00");
row2.put("name", "lisi");
row2.put("age", "180");
String rowStr1 = JSON.toJSONString(row1);
String rowStr2 = JSON.toJSONString(row2);
if ("csv".equals(key.getFormat())) {
rowStr1 = String.join(EscapeHandler.escapeString(key.getColumnSeparator()), "2021-02-02", "2021-02-02 00:00:00", "zhangsan", "18");
rowStr2 = String.join(EscapeHandler.escapeString(key.getColumnSeparator()), "2022-02-02", "2022-02-02 10:00:00", "lisi", "180");
}
System.out.println("rows1: " + rowStr1);
System.out.println("rows2: " + rowStr2);
flushBatch.putData(rowStr2);
for (int i = 0; i < 500000; ++i) {
for (int i = 0; i < 1; ++i) {
flushBatch.putData(rowStr1);
flushBatch.putData(rowStr2);
}
emitter.doStreamLoad(flushBatch);
emitter.emit(flushBatch);
}
}

View File

@ -0,0 +1,42 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.
package com.alibaba.datax.plugin.writer.doriswriter;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
/**
* Handler for escape in properties.
*/
public class EscapeHandler {
public static final String ESCAPE_DELIMITERS_FLAGS = "\\x";
public static final Pattern ESCAPE_PATTERN = Pattern.compile("\\\\x([0-9|a-f|A-F]{2})");
public static String escapeString(String source) {
if (source.startsWith(ESCAPE_DELIMITERS_FLAGS)) {
Matcher m = ESCAPE_PATTERN.matcher(source);
StringBuffer buf = new StringBuffer();
while (m.find()) {
m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1), 16)));
}
m.appendTail(buf);
return buf.toString();
}
return source;
}
}

View File

@ -23,6 +23,8 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.google.common.base.Strings;
import java.io.Serializable;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@ -34,6 +36,7 @@ public class Key implements Serializable {
public static final String DATABASE = "database";
public static final String TABLE = "table";
public static final String COLUMN = "column";
public static final String TIME_ZONE = "timeZone";
public static final String USERNAME = "username";
public static final String PASSWORD = "password";
@ -42,28 +45,36 @@ public class Key implements Serializable {
public static final String POST_SQL = "postSql";
public static final String LOAD_PROPS = "loadProps";
public static final String LOAD_PROPS_LINE_DELIMITER = "line_delimiter";
public static final String LOAD_PROPS_COLUMN_SEPARATOR = "column_separator";
public static final String MAX_BATCH_ROWS = "maxBatchRows";
public static final String MAX_BATCH_BYTE_SIZE = "maxBatchByteSize";
public static final String MAX_RETRIES = "maxRetries";
public static final String LABEL_PREFIX = "labelPrefix";
public static final String LINE_DELIMITER = "lineDelimiter";
public static final String FORMAT = "format";
public static final String CONNECT_TIMEOUT = "connectTimeout";
private final Configuration options;
private final String lineDelimiterDesc;
private static final long DEFAULT_MAX_BATCH_ROWS = 50_0000;
private static final long DEFAULT_MAX_BATCH_BYTE_SIZE = 100 * 1024 * 1024; // 100MB
private static final long DEFAULT_MAX_BATCH_BYTE_SIZE = 90 * 1024 * 1024; // 90MB
private static final int DEFAULT_MAX_RETRIES = 0;
private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_";
private static final String DEFAULT_LINE_DELIMITER = "\n";
private static final String DEFAULT_COLUMN_SEPARATOR = "\\x01";
private static final String DEFAULT_LINE_DELIMITER = "\\x02";
public static final String DEFAULT_FORMAT_CSV = "csv";
private static final String DEFAULT_TIME_ZONE = "+08:00";
private static final int DEFAULT_CONNECT_TIMEOUT = -1;
public Key(final Configuration options) {
this.options = options;
this.lineDelimiterDesc = parseHexReadable(this.getLineDelimiter());
}
public void doPretreatment() {
this.validateRequired();
this.validateStreamLoadUrl();
this.validateFormat();
}
public String getJdbcUrl() {
@ -98,6 +109,10 @@ public class Key implements Serializable {
return this.options.getList(COLUMN, String.class);
}
public String getTimeZone() {
return this.options.getString(TIME_ZONE, DEFAULT_TIME_ZONE);
}
public List<String> getPreSqlList() {
return this.options.getList(PRE_SQL, String.class);
}
@ -107,7 +122,7 @@ public class Key implements Serializable {
}
public Map<String, Object> getLoadProps() {
return this.options.getMap(LOAD_PROPS);
return this.options.getMap(LOAD_PROPS, new HashMap<>());
}
public long getBatchRows() {
@ -118,22 +133,30 @@ public class Key implements Serializable {
return this.options.getLong(MAX_BATCH_BYTE_SIZE, DEFAULT_MAX_BATCH_BYTE_SIZE);
}
public int getMaxRetries() {
return this.options.getInt(MAX_RETRIES, DEFAULT_MAX_RETRIES);
}
public String getLabelPrefix() {
return this.options.getString(LABEL_PREFIX, DEFAULT_LABEL_PREFIX);
}
public String getLineDelimiter() {
return this.options.getString(LINE_DELIMITER, DEFAULT_LINE_DELIMITER);
return getLoadProps().getOrDefault(LOAD_PROPS_LINE_DELIMITER, DEFAULT_LINE_DELIMITER).toString();
}
public String getFormat() {
return this.options.getString(FORMAT, DEFAULT_FORMAT_CSV);
}
public String getColumnSeparator() {
return getLoadProps().getOrDefault(LOAD_PROPS_COLUMN_SEPARATOR, DEFAULT_COLUMN_SEPARATOR).toString();
}
public int getConnectTimeout() {
return this.options.getInt(CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT);
}
public String getLineDelimiterDesc() {
return lineDelimiterDesc;
}
private void validateStreamLoadUrl() {
List<String> urlList = this.getBeLoadUrlList();
if (urlList == null) {
@ -152,14 +175,11 @@ public class Key implements Serializable {
}
}
private String parseHexReadable(String s) {
byte[] separatorBytes = s.getBytes();
StringBuilder desc = new StringBuilder();
for (byte separatorByte : separatorBytes) {
desc.append(String.format("\\x%02x", separatorByte));
private void validateFormat() {
String format = this.getFormat();
if (!Arrays.asList("csv", "json").contains(format.toLowerCase())) {
throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "format only supports csv or json");
}
return desc.toString();
}
private void validateRequired() {

View File

@ -6,6 +6,7 @@
"database": "",
"table": "",
"column": [],
"timeZone": "",
"preSql": [],
"postSql": [],
"jdbcUrl": "",