From 6c41fe832b79675171a7fefe4777c3a4887e1b39 Mon Sep 17 00:00:00 2001 From: "jiafeng.zhang" Date: Thu, 29 Sep 2022 09:25:25 +0800 Subject: [PATCH] support csv format import support csv format import --- doriswriter/src/main/assembly/package.xml | 12 +- .../plugin/writer/doriswriter/DorisCodec.java | 11 +- .../writer/doriswriter/DorisCsvCodec.java | 49 +++++++ .../writer/doriswriter/DorisFlushBatch.java | 36 ++++-- .../writer/doriswriter/DorisJsonCodec.java | 4 +- .../writer/doriswriter/DorisWriter.java | 20 +-- .../doriswriter/DorisWriterEmitter.java | 120 ++++++++++-------- .../writer/doriswriter/EscapeHandler.java | 42 ++++++ .../datax/plugin/writer/doriswriter/Key.java | 56 +++++--- .../main/resources/plugin_job_template.json | 1 + 10 files changed, 245 insertions(+), 106 deletions(-) create mode 100644 doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java create mode 100644 doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/EscapeHandler.java diff --git a/doriswriter/src/main/assembly/package.xml b/doriswriter/src/main/assembly/package.xml index 9fca5e93..71596332 100644 --- a/doriswriter/src/main/assembly/package.xml +++ b/doriswriter/src/main/assembly/package.xml @@ -1,5 +1,4 @@ - - - - + + dir @@ -45,7 +42,6 @@ under the License. plugin/writer/doriswriter - false diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java index 9a364f40..51bc6881 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCodec.java @@ -22,16 +22,17 @@ import com.alibaba.datax.common.element.DateColumn; import com.alibaba.datax.common.element.Record; import org.apache.commons.lang3.time.DateFormatUtils; +import java.time.ZoneId; import java.util.List; import java.util.TimeZone; public abstract class DorisCodec { - protected static String timeZone = "GMT+8"; - protected static TimeZone timeZoner = TimeZone.getTimeZone(timeZone); + protected final TimeZone timeZone; protected final List fieldNames; - public DorisCodec(final List fieldNames) { + public DorisCodec(final List fieldNames, final String timeZone) { this.fieldNames = fieldNames; + this.timeZone = TimeZone.getTimeZone(ZoneId.of(timeZone)); } public abstract String serialize(Record row); @@ -60,9 +61,9 @@ public abstract class DorisCodec { final DateColumn.DateType dateType = ((DateColumn) col).getSubType(); switch (dateType) { case DATE: - return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd", timeZoner); + return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd", timeZone); case DATETIME: - return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss", timeZoner); + return DateFormatUtils.format(col.asDate(), "yyyy-MM-dd HH:mm:ss", timeZone); default: return col.asString(); } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java new file mode 100644 index 00000000..d433af38 --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisCsvCodec.java @@ -0,0 +1,49 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.alibaba.datax.plugin.writer.doriswriter; + +import com.alibaba.datax.common.element.Record; + +import java.util.ArrayList; +import java.util.List; + +public class DorisCsvCodec extends DorisCodec { + + private final String columnSeparator; + + public DorisCsvCodec(final List fieldNames, String columnSeparator, String timeZone) { + super(fieldNames, timeZone); + this.columnSeparator = EscapeHandler.escapeString(columnSeparator); + } + + @Override + public String serialize(final Record row) { + if (null == this.fieldNames) { + return ""; + } + List list = new ArrayList<>(); + + for (int i = 0; i < this.fieldNames.size(); i++) { + Object value = this.convertColumn(row.getColumn(i)); + list.add(value != null ? value.toString() : "\\N"); + } + + return String.join(columnSeparator, list); + } + +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java index 493ddec9..9980c937 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisFlushBatch.java @@ -17,15 +17,20 @@ package com.alibaba.datax.plugin.writer.doriswriter; +import java.util.ArrayList; +import java.util.List; + // A wrapper class to hold a batch of loaded rows public class DorisFlushBatch { - private String lineDelimiter; + private final String format; + private final String lineDelimiter; private String label; - private long rows = 0; - private StringBuilder data = new StringBuilder(); + private long byteSize = 0; + private List data = new ArrayList<>(); - public DorisFlushBatch(String lineDelimiter) { - this.lineDelimiter = lineDelimiter; + public DorisFlushBatch(String lineDelimiter, String format) { + this.lineDelimiter = EscapeHandler.escapeString(lineDelimiter); + this.format = format; } public void setLabel(String label) { @@ -37,22 +42,25 @@ public class DorisFlushBatch { } public long getRows() { - return rows; + return data.size(); } public void putData(String row) { - if (data.length() > 0) { - data.append(lineDelimiter); - } - data.append(row); - rows++; + data.add(row); + byteSize += row.getBytes().length; } - public StringBuilder getData() { - return data; + public String getData() { + String result; + if (Key.DEFAULT_FORMAT_CSV.equalsIgnoreCase(format)) { + result = String.join(this.lineDelimiter, data); + } else { + result = data.toString(); + } + return result; } public long getSize() { - return data.length(); + return byteSize; } } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java index 8d4568c2..664b9d36 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisJsonCodec.java @@ -28,8 +28,8 @@ import java.util.Map; public class DorisJsonCodec extends DorisCodec { private Map rowMap; - public DorisJsonCodec(final List fieldNames) { - super(fieldNames); + public DorisJsonCodec(final List fieldNames, final String timeZone) { + super(fieldNames, timeZone); this.rowMap = new HashMap<>(this.fieldNames.size()); } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java index 71e2d1ad..a2bdfd7e 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriter.java @@ -48,6 +48,7 @@ public class DorisWriter extends Writer { private Key keys; private DorisCodec rowCodec; private int batchNum = 0; + private String labelPrefix; public Task() { } @@ -55,7 +56,12 @@ public class DorisWriter extends Writer { @Override public void init() { this.keys = new Key(super.getPluginJobConf()); - this.rowCodec = new DorisJsonCodec(this.keys.getColumns()); + if (Key.DEFAULT_FORMAT_CSV.equalsIgnoreCase(this.keys.getFormat())) { + this.rowCodec = new DorisCsvCodec(this.keys.getColumns(), this.keys.getColumnSeparator(), this.keys.getTimeZone()); + } else { + this.rowCodec = new DorisJsonCodec(this.keys.getColumns(), this.keys.getTimeZone()); + } + this.labelPrefix = this.keys.getLabelPrefix() + UUID.randomUUID(); this.dorisWriterEmitter = new DorisWriterEmitter(keys); } @@ -66,7 +72,7 @@ public class DorisWriter extends Writer { @Override public void startWrite(RecordReceiver recordReceiver) { String lineDelimiter = this.keys.getLineDelimiter(); - DorisFlushBatch flushBatch = new DorisFlushBatch(lineDelimiter); + DorisFlushBatch flushBatch = new DorisFlushBatch(lineDelimiter, this.keys.getFormat()); long batchCount = 0; long batchByteSize = 0L; Record record; @@ -93,7 +99,7 @@ public class DorisWriter extends Writer { // clear buffer batchCount = 0; batchByteSize = 0L; - flushBatch = new DorisFlushBatch(lineDelimiter); + flushBatch = new DorisFlushBatch(lineDelimiter, this.keys.getFormat()); } } // end of while @@ -103,14 +109,12 @@ public class DorisWriter extends Writer { } private void flush(DorisFlushBatch flushBatch) { - final String label = getStreamLoadLabel(); - flushBatch.setLabel(label); - dorisWriterEmitter.doStreamLoad(flushBatch); + flushBatch.setLabel(getStreamLoadLabel()); + dorisWriterEmitter.emit(flushBatch); } private String getStreamLoadLabel() { - String labelPrefix = this.keys.getLabelPrefix(); - return labelPrefix + UUID.randomUUID().toString() + "_" + (batchNum++); + return labelPrefix + "_" + (batchNum++); } @Override diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java index fb4e46b0..ccd9fc4d 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/DorisWriterEmitter.java @@ -23,6 +23,7 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; import com.alibaba.fastjson.JSON; import com.google.common.collect.Lists; import com.google.common.collect.Maps; +import org.apache.commons.lang3.StringUtils; import org.apache.http.HttpEntity; import org.apache.http.HttpHeaders; import org.apache.http.HttpRequest; @@ -47,12 +48,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.io.IOException; -import java.net.HttpURLConnection; import java.net.URI; -import java.net.URL; import java.util.Base64; import java.util.List; import java.util.Map; +import java.util.stream.Collectors; // Used to load batch of rows to Doris using stream load public class DorisWriterEmitter { @@ -88,13 +88,28 @@ public class DorisWriterEmitter { } } + public void emit(final DorisFlushBatch flushData) { + String host = this.getAvailableHost(); + for (int i = 0; i <= this.keys.getMaxRetries(); i++) { + try { + doStreamLoad(flushData, host); + return; + } catch (DataXException ex) { + if (i >= this.keys.getMaxRetries()) { + throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, ex); + } + LOG.error("StreamLoad error, switch host {} and retry: ", host, ex); + host = this.getAvailableHost(); + } + } + } + /** * execute doris stream load */ - public void doStreamLoad(final DorisFlushBatch flushData) { + private void doStreamLoad(final DorisFlushBatch flushData, String host) { long start = System.currentTimeMillis(); - final String host = this.getAvailableHost(); - if (null == host) { + if (StringUtils.isEmpty(host)) { throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, "None of the load url can be connected."); } final String loadUrl = host + "/api/" + this.keys.getDatabase() + "/" + this.keys.getTable() + "/_stream_load"; @@ -130,28 +145,12 @@ public class DorisWriterEmitter { while (this.hostPos < targetHosts.size()) { final String host = targetHosts.get(hostPos); ++this.hostPos; - if (this.tryHttpConnection(host)) { - return host; - } + return host; } return null; } - private boolean tryHttpConnection(final String host) { - try { - final URL url = new URL(host); - final HttpURLConnection co = (HttpURLConnection) url.openConnection(); - co.setConnectTimeout(1000); - co.connect(); - co.disconnect(); - return true; - } catch (Exception e) { - LOG.warn("Failed to connect to address:{} , Exception ={}", host, e); - return false; - } - } - private Map doHttpPut(final String loadUrl, final DorisFlushBatch flushBatch) throws IOException { LOG.info(String.format("Executing stream load to: '%s', size: %s, rows: %d", loadUrl, flushBatch.getSize(), flushBatch.getRows())); @@ -181,10 +180,12 @@ public class DorisWriterEmitter { final HttpPut httpPut = new HttpPut(loadUrl); final List cols = this.keys.getColumns(); if (null != cols && !cols.isEmpty()) { - httpPut.setHeader("columns", String.join(",", cols)); + httpPut.setHeader("columns", String.join(",", cols.stream().map(item -> String.format("`%s`", item.trim().replace("`", ""))).collect(Collectors.toList()))); } - // put loadProps to http header + //set default header + setDefaultHeader(httpPut); + // put custom loadProps to http header final Map loadProps = this.keys.getLoadProps(); if (null != loadProps) { for (final Map.Entry entry : loadProps.entrySet()) { @@ -196,14 +197,9 @@ public class DorisWriterEmitter { httpPut.setHeader(HttpHeaders.EXPECT, "100-continue"); httpPut.setHeader(HttpHeaders.AUTHORIZATION, this.getBasicAuthHeader(this.keys.getUsername(), this.keys.getPassword())); httpPut.setHeader("label", flushBatch.getLabel()); - httpPut.setHeader("format", "json"); - httpPut.setHeader("line_delimiter", this.keys.getLineDelimiterDesc()); - httpPut.setHeader("read_json_by_line", "true"); - httpPut.setHeader("fuzzy_parse", "true"); // Use ByteArrayEntity instead of StringEntity to handle Chinese correctly - httpPut.setEntity(new ByteArrayEntity(flushBatch.getData().toString().getBytes())); - + httpPut.setEntity(new ByteArrayEntity(flushBatch.getData().getBytes())); httpPut.setConfig(requestConfig); try (final CloseableHttpResponse resp = httpclient.execute(httpPut)) { @@ -222,6 +218,21 @@ public class DorisWriterEmitter { } } + /** + * Set default request headers in json and csv formats. + * csv default delimiters are \x01 and \x02 + */ + private void setDefaultHeader(HttpPut httpPut) { + if (Key.DEFAULT_FORMAT_CSV.equalsIgnoreCase(this.keys.getFormat())) { + httpPut.setHeader("line_delimiter", this.keys.getLineDelimiter()); + httpPut.setHeader("column_separator", this.keys.getColumnSeparator()); + } else { + httpPut.setHeader("format", "json"); + httpPut.setHeader("strip_outer_array", "true"); + httpPut.setHeader("fuzzy_parse", "true"); + } + } + private String getBasicAuthHeader(final String username, final String password) { final String auth = username + ":" + password; final byte[] encodedAuth = Base64.getEncoder().encode(auth.getBytes()); @@ -231,44 +242,51 @@ public class DorisWriterEmitter { // for test public static void main(String[] args) throws IOException { String json = "{\n" + - " \"feLoadUrl\": [\"127.0.0.1:8030\"],\n" + - " \"column\": [\"k1\", \"k2\", \"k3\"],\n" + - " \"database\": \"db1\",\n" + + " \"beLoadUrl\": [\"127.0.0.1:8040\"],\n" + + " \"column\": [\"name\", \"age\", \"cdate\", \"cdatetime\"],\n" + + " \"database\": \"test\",\n" + " \"jdbcUrl\": \"jdbc:mysql://127.0.0.1:9030/\",\n" + " \"loadProps\": {\n" + +// " \"line_delimiter\": \"\\\\x03\",\n" + +// " \"column_separator\": \"\\\\x04\",\n" + " },\n" + - " \"password\": \"12345\",\n" + + " \"format\": \"csv\",\n" + + " \"password\": \"\",\n" + " \"postSql\": [],\n" + " \"preSql\": [],\n" + - " \"table\": \"t1\",\n" + + " \"table\": \"test_datax\",\n" + + " \"maxRetries\": \"0\",\n" + " \"username\": \"root\"\n" + " }"; Configuration configuration = Configuration.from(json); Key key = new Key(configuration); DorisWriterEmitter emitter = new DorisWriterEmitter(key); - DorisFlushBatch flushBatch = new DorisFlushBatch("\n"); - flushBatch.setLabel("test4"); + DorisFlushBatch flushBatch = new DorisFlushBatch(key.getLineDelimiter(), key.getFormat()); + Map row1 = Maps.newHashMap(); - row1.put("k1", "2021-02-02"); - row1.put("k2", "2021-02-02 00:00:00"); - row1.put("k3", "3"); - String rowStr1 = JSON.toJSONString(row1); - System.out.println("rows1: " + rowStr1); - flushBatch.putData(rowStr1); - + row1.put("cdate", "2021-02-02"); + row1.put("cdatetime", "2021-02-02 00:00:00"); + row1.put("name", "zhangsan"); + row1.put("age", "18"); Map row2 = Maps.newHashMap(); - row2.put("k1", "2021-02-03"); - row2.put("k2", "2021-02-03 00:00:00"); - row2.put("k3", "4"); + row2.put("cdate", "2022-02-02"); + row2.put("cdatetime", "2022-02-02 10:00:00"); + row2.put("name", "lisi"); + row2.put("age", "180"); + String rowStr1 = JSON.toJSONString(row1); String rowStr2 = JSON.toJSONString(row2); + if ("csv".equals(key.getFormat())) { + rowStr1 = String.join(EscapeHandler.escapeString(key.getColumnSeparator()), "2021-02-02", "2021-02-02 00:00:00", "zhangsan", "18"); + rowStr2 = String.join(EscapeHandler.escapeString(key.getColumnSeparator()), "2022-02-02", "2022-02-02 10:00:00", "lisi", "180"); + } + System.out.println("rows1: " + rowStr1); System.out.println("rows2: " + rowStr2); - flushBatch.putData(rowStr2); - for (int i = 0; i < 500000; ++i) { + for (int i = 0; i < 1; ++i) { + flushBatch.putData(rowStr1); flushBatch.putData(rowStr2); } - - emitter.doStreamLoad(flushBatch); + emitter.emit(flushBatch); } } diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/EscapeHandler.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/EscapeHandler.java new file mode 100644 index 00000000..91b0fbdb --- /dev/null +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/EscapeHandler.java @@ -0,0 +1,42 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package com.alibaba.datax.plugin.writer.doriswriter; + +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +/** + * Handler for escape in properties. + */ +public class EscapeHandler { + public static final String ESCAPE_DELIMITERS_FLAGS = "\\x"; + public static final Pattern ESCAPE_PATTERN = Pattern.compile("\\\\x([0-9|a-f|A-F]{2})"); + + public static String escapeString(String source) { + if (source.startsWith(ESCAPE_DELIMITERS_FLAGS)) { + Matcher m = ESCAPE_PATTERN.matcher(source); + StringBuffer buf = new StringBuffer(); + while (m.find()) { + m.appendReplacement(buf, String.format("%s", (char) Integer.parseInt(m.group(1), 16))); + } + m.appendTail(buf); + return buf.toString(); + } + return source; + } +} diff --git a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java index cfbef96c..72812a9c 100644 --- a/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java +++ b/doriswriter/src/main/java/com/alibaba/datax/plugin/writer/doriswriter/Key.java @@ -23,6 +23,8 @@ import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; import com.google.common.base.Strings; import java.io.Serializable; +import java.util.Arrays; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -34,6 +36,7 @@ public class Key implements Serializable { public static final String DATABASE = "database"; public static final String TABLE = "table"; public static final String COLUMN = "column"; + public static final String TIME_ZONE = "timeZone"; public static final String USERNAME = "username"; public static final String PASSWORD = "password"; @@ -42,28 +45,36 @@ public class Key implements Serializable { public static final String POST_SQL = "postSql"; public static final String LOAD_PROPS = "loadProps"; + public static final String LOAD_PROPS_LINE_DELIMITER = "line_delimiter"; + public static final String LOAD_PROPS_COLUMN_SEPARATOR = "column_separator"; + public static final String MAX_BATCH_ROWS = "maxBatchRows"; public static final String MAX_BATCH_BYTE_SIZE = "maxBatchByteSize"; + public static final String MAX_RETRIES = "maxRetries"; public static final String LABEL_PREFIX = "labelPrefix"; - public static final String LINE_DELIMITER = "lineDelimiter"; + public static final String FORMAT = "format"; public static final String CONNECT_TIMEOUT = "connectTimeout"; private final Configuration options; - private final String lineDelimiterDesc; private static final long DEFAULT_MAX_BATCH_ROWS = 50_0000; - private static final long DEFAULT_MAX_BATCH_BYTE_SIZE = 100 * 1024 * 1024; // 100MB + private static final long DEFAULT_MAX_BATCH_BYTE_SIZE = 90 * 1024 * 1024; // 90MB + private static final int DEFAULT_MAX_RETRIES = 0; + private static final String DEFAULT_LABEL_PREFIX = "datax_doris_writer_"; - private static final String DEFAULT_LINE_DELIMITER = "\n"; + private static final String DEFAULT_COLUMN_SEPARATOR = "\\x01"; + private static final String DEFAULT_LINE_DELIMITER = "\\x02"; + public static final String DEFAULT_FORMAT_CSV = "csv"; + private static final String DEFAULT_TIME_ZONE = "+08:00"; private static final int DEFAULT_CONNECT_TIMEOUT = -1; public Key(final Configuration options) { this.options = options; - this.lineDelimiterDesc = parseHexReadable(this.getLineDelimiter()); } public void doPretreatment() { this.validateRequired(); this.validateStreamLoadUrl(); + this.validateFormat(); } public String getJdbcUrl() { @@ -98,6 +109,10 @@ public class Key implements Serializable { return this.options.getList(COLUMN, String.class); } + public String getTimeZone() { + return this.options.getString(TIME_ZONE, DEFAULT_TIME_ZONE); + } + public List getPreSqlList() { return this.options.getList(PRE_SQL, String.class); } @@ -107,7 +122,7 @@ public class Key implements Serializable { } public Map getLoadProps() { - return this.options.getMap(LOAD_PROPS); + return this.options.getMap(LOAD_PROPS, new HashMap<>()); } public long getBatchRows() { @@ -118,22 +133,30 @@ public class Key implements Serializable { return this.options.getLong(MAX_BATCH_BYTE_SIZE, DEFAULT_MAX_BATCH_BYTE_SIZE); } + public int getMaxRetries() { + return this.options.getInt(MAX_RETRIES, DEFAULT_MAX_RETRIES); + } + public String getLabelPrefix() { return this.options.getString(LABEL_PREFIX, DEFAULT_LABEL_PREFIX); } public String getLineDelimiter() { - return this.options.getString(LINE_DELIMITER, DEFAULT_LINE_DELIMITER); + return getLoadProps().getOrDefault(LOAD_PROPS_LINE_DELIMITER, DEFAULT_LINE_DELIMITER).toString(); + } + + public String getFormat() { + return this.options.getString(FORMAT, DEFAULT_FORMAT_CSV); + } + + public String getColumnSeparator() { + return getLoadProps().getOrDefault(LOAD_PROPS_COLUMN_SEPARATOR, DEFAULT_COLUMN_SEPARATOR).toString(); } public int getConnectTimeout() { return this.options.getInt(CONNECT_TIMEOUT, DEFAULT_CONNECT_TIMEOUT); } - public String getLineDelimiterDesc() { - return lineDelimiterDesc; - } - private void validateStreamLoadUrl() { List urlList = this.getBeLoadUrlList(); if (urlList == null) { @@ -152,14 +175,11 @@ public class Key implements Serializable { } } - private String parseHexReadable(String s) { - byte[] separatorBytes = s.getBytes(); - StringBuilder desc = new StringBuilder(); - - for (byte separatorByte : separatorBytes) { - desc.append(String.format("\\x%02x", separatorByte)); + private void validateFormat() { + String format = this.getFormat(); + if (!Arrays.asList("csv", "json").contains(format.toLowerCase())) { + throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, "format only supports csv or json"); } - return desc.toString(); } private void validateRequired() { diff --git a/doriswriter/src/main/resources/plugin_job_template.json b/doriswriter/src/main/resources/plugin_job_template.json index 152f1eee..9cd9bb18 100644 --- a/doriswriter/src/main/resources/plugin_job_template.json +++ b/doriswriter/src/main/resources/plugin_job_template.json @@ -6,6 +6,7 @@ "database": "", "table": "", "column": [], + "timeZone": "", "preSql": [], "postSql": [], "jdbcUrl": "",