diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/StarRocksWriter.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/StarRocksWriter.java deleted file mode 100755 index 666a99d9..00000000 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/StarRocksWriter.java +++ /dev/null @@ -1,141 +0,0 @@ -package com.starrocks.connector.datax.plugin.writer.starrockswriter; - -import com.alibaba.datax.common.element.Record; -import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.common.plugin.RecordReceiver; -import com.alibaba.datax.common.spi.Writer; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.plugin.rdbms.util.DBUtil; -import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; -import com.alibaba.datax.plugin.rdbms.util.DataBaseType; -import com.starrocks.connector.datax.plugin.writer.starrockswriter.manager.StarRocksWriterManager; -import com.starrocks.connector.datax.plugin.writer.starrockswriter.row.StarRocksISerializer; -import com.starrocks.connector.datax.plugin.writer.starrockswriter.row.StarRocksSerializerFactory; -import com.starrocks.connector.datax.plugin.writer.starrockswriter.util.StarRocksWriterUtil; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.util.ArrayList; -import java.util.List; - -public class StarRocksWriter extends Writer { - - public static class Job extends Writer.Job { - - private static final Logger LOG = LoggerFactory.getLogger(Job.class); - private Configuration originalConfig = null; - private StarRocksWriterOptions options; - - @Override - public void init() { - this.originalConfig = super.getPluginJobConf(); - options = new StarRocksWriterOptions(super.getPluginJobConf()); - options.doPretreatment(); - } - - @Override - public void preCheck(){ - this.init(); - StarRocksWriterUtil.preCheckPrePareSQL(options); - StarRocksWriterUtil.preCheckPostSQL(options); - } - - @Override - public void prepare() { - String username = options.getUsername(); - String password = options.getPassword(); - String jdbcUrl = options.getJdbcUrl(); - List renderedPreSqls = StarRocksWriterUtil.renderPreOrPostSqls(options.getPreSqlList(), options.getTable()); - if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) { - Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password); - LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPreSqls), jdbcUrl); - StarRocksWriterUtil.executeSqls(conn, renderedPreSqls); - DBUtil.closeDBResources(null, null, conn); - } - } - - @Override - public List split(int mandatoryNumber) { - List configurations = new ArrayList<>(mandatoryNumber); - for (int i = 0; i < mandatoryNumber; i++) { - configurations.add(originalConfig); - } - return configurations; - } - - @Override - public void post() { - String username = options.getUsername(); - String password = options.getPassword(); - String jdbcUrl = options.getJdbcUrl(); - List renderedPostSqls = StarRocksWriterUtil.renderPreOrPostSqls(options.getPostSqlList(), options.getTable()); - if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) { - Connection conn = DBUtil.getConnection(DataBaseType.MySql, jdbcUrl, username, password); - LOG.info("Begin to execute preSqls:[{}]. context info:{}.", String.join(";", renderedPostSqls), jdbcUrl); - StarRocksWriterUtil.executeSqls(conn, renderedPostSqls); - DBUtil.closeDBResources(null, null, conn); - } - } - - @Override - public void destroy() { - } - - } - - public static class Task extends Writer.Task { - private StarRocksWriterManager writerManager; - private StarRocksWriterOptions options; - private StarRocksISerializer rowSerializer; - - @Override - public void init() { - options = new StarRocksWriterOptions(super.getPluginJobConf()); - writerManager = new StarRocksWriterManager(options); - rowSerializer = StarRocksSerializerFactory.createSerializer(options); - } - - @Override - public void prepare() { - } - - public void startWrite(RecordReceiver recordReceiver) { - try { - Record record; - while ((record = recordReceiver.getFromReader()) != null) { - if (record.getColumnNumber() != options.getColumns().size()) { - throw DataXException - .asDataXException( - DBUtilErrorCode.CONF_ERROR, - String.format( - "列配置信息有错误. 因为您配置的任务中,源头读取字段数:%s 与 目的表要写入的字段数:%s 不相等. 请检查您的配置并作出修改.", - record.getColumnNumber(), - options.getColumns().size())); - } - writerManager.writeRecord(rowSerializer.serialize(record)); - } - } catch (Exception e) { - throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); - } - } - - @Override - public void post() { - try { - writerManager.close(); - } catch (Exception e) { - throw DataXException.asDataXException(DBUtilErrorCode.WRITE_DATA_ERROR, e); - } - } - - @Override - public void destroy() {} - - @Override - public boolean supportFailOver(){ - return false; - } - } -} diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/StarRocksWriterOptions.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/StarRocksWriterOptions.java deleted file mode 100644 index 5180512f..00000000 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/StarRocksWriterOptions.java +++ /dev/null @@ -1,146 +0,0 @@ -package com.starrocks.connector.datax.plugin.writer.starrockswriter; - -import java.io.Serializable; - -import com.alibaba.datax.common.exception.DataXException; -import com.alibaba.datax.common.util.Configuration; -import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; - -import java.util.List; -import java.util.Map; -import java.util.stream.Collectors; - -public class StarRocksWriterOptions implements Serializable { - - private static final long serialVersionUID = 1l; - private static final long KILO_BYTES_SCALE = 1024l; - private static final long MEGA_BYTES_SCALE = KILO_BYTES_SCALE * KILO_BYTES_SCALE; - private static final int MAX_RETRIES = 1; - private static final int BATCH_ROWS = 500000; - private static final long BATCH_BYTES = 90 * MEGA_BYTES_SCALE; - - private static final String KEY_LOAD_PROPS_FORMAT = "format"; - public enum StreamLoadFormat { - CSV, JSON; - } - - private static final String KEY_USERNAME = "username"; - private static final String KEY_PASSWORD = "password"; - private static final String KEY_DATABASE = "database"; - private static final String KEY_TABLE = "table"; - private static final String KEY_COLUMN = "column"; - private static final String KEY_PRE_SQL = "preSql"; - private static final String KEY_POST_SQL = "postSql"; - private static final String KEY_JDBC_URL = "jdbcUrl"; - private static final String KEY_MAX_BATCH_ROWS = "maxBatchRows"; - private static final String KEY_MAX_BATCH_SIZE = "maxBatchSize"; - private static final String KEY_LOAD_URL = "loadUrl"; - private static final String KEY_FLUSH_QUEUE_LENGTH = "flushQueueLength"; - private static final String KEY_LOAD_PROPS = "loadProps"; - - private final Configuration options; - - public StarRocksWriterOptions(Configuration options) { - this.options = options; - } - - public void doPretreatment() { - validateRequired(); - validateStreamLoadUrl(); - } - - public String getJdbcUrl() { - return options.getString(KEY_JDBC_URL); - } - - public String getDatabase() { - return options.getString(KEY_DATABASE); - } - - public String getTable() { - return options.getString(KEY_TABLE); - } - - public String getUsername() { - return options.getString(KEY_USERNAME); - } - - public String getPassword() { - return options.getString(KEY_PASSWORD); - } - - public List getLoadUrlList() { - return options.getList(KEY_LOAD_URL, String.class); - } - - public List getColumns() { - return options.getList(KEY_COLUMN, String.class).stream().map(str -> str.replace("`", "")).collect(Collectors.toList()); - } - - public List getPreSqlList() { - return options.getList(KEY_PRE_SQL, String.class); - } - - public List getPostSqlList() { - return options.getList(KEY_POST_SQL, String.class); - } - - public Map getLoadProps() { - return options.getMap(KEY_LOAD_PROPS); - } - - public int getMaxRetries() { - return MAX_RETRIES; - } - - public int getBatchRows() { - Integer rows = options.getInt(KEY_MAX_BATCH_ROWS); - return null == rows ? BATCH_ROWS : rows; - } - - public long getBatchSize() { - Long size = options.getLong(KEY_MAX_BATCH_SIZE); - return null == size ? BATCH_BYTES : size; - } - - public int getFlushQueueLength() { - Integer len = options.getInt(KEY_FLUSH_QUEUE_LENGTH); - return null == len ? 1 : len; - } - - public StreamLoadFormat getStreamLoadFormat() { - Map loadProps = getLoadProps(); - if (null == loadProps) { - return StreamLoadFormat.CSV; - } - if (loadProps.containsKey(KEY_LOAD_PROPS_FORMAT) - && StreamLoadFormat.JSON.name().equalsIgnoreCase(String.valueOf(loadProps.get(KEY_LOAD_PROPS_FORMAT)))) { - return StreamLoadFormat.JSON; - } - return StreamLoadFormat.CSV; - } - - private void validateStreamLoadUrl() { - List urlList = getLoadUrlList(); - for (String host : urlList) { - if (host.split(":").length < 2) { - throw DataXException.asDataXException(DBUtilErrorCode.CONF_ERROR, - "loadUrl的格式不正确,请输入 `fe_ip:fe_http_ip;fe_ip:fe_http_ip`。"); - } - } - } - - private void validateRequired() { - final String[] requiredOptionKeys = new String[]{ - KEY_USERNAME, - KEY_PASSWORD, - KEY_DATABASE, - KEY_TABLE, - KEY_COLUMN, - KEY_LOAD_URL - }; - for (String optionKey : requiredOptionKeys) { - options.getNecessaryValue(optionKey, DBUtilErrorCode.REQUIRED_VALUE); - } - } -} diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/manager/StarRocksFlushTuple.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/manager/StarRocksFlushTuple.java deleted file mode 100644 index cd8c663b..00000000 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/manager/StarRocksFlushTuple.java +++ /dev/null @@ -1,20 +0,0 @@ -package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager; - -import java.util.List; - -public class StarRocksFlushTuple { - - private String label; - private Long bytes; - private List rows; - - public StarRocksFlushTuple(String label, Long bytes, List rows) { - this.label = label; - this.bytes = bytes; - this.rows = rows; - } - - public String getLabel() { return label; } - public Long getBytes() { return bytes; } - public List getRows() { return rows; } -} \ No newline at end of file diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/manager/StarRocksStreamLoadVisitor.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/manager/StarRocksStreamLoadVisitor.java deleted file mode 100644 index 84fc5200..00000000 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/manager/StarRocksStreamLoadVisitor.java +++ /dev/null @@ -1,175 +0,0 @@ -package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager; - -import java.io.IOException; -import java.net.HttpURLConnection; -import java.net.URL; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; - -import com.alibaba.fastjson.JSON; -import com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriterOptions; -import com.starrocks.connector.datax.plugin.writer.starrockswriter.row.StarRocksDelimiterParser; - -import org.apache.commons.codec.binary.Base64; -import org.apache.http.HttpEntity; -import org.apache.http.client.config.RequestConfig; -import org.apache.http.client.methods.CloseableHttpResponse; -import org.apache.http.client.methods.HttpPut; -import org.apache.http.entity.ByteArrayEntity; -import org.apache.http.impl.client.CloseableHttpClient; -import org.apache.http.impl.client.DefaultRedirectStrategy; -import org.apache.http.impl.client.HttpClientBuilder; -import org.apache.http.impl.client.HttpClients; -import org.apache.http.util.EntityUtils; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; -import java.util.Map; - - -public class StarRocksStreamLoadVisitor { - - private static final Logger LOG = LoggerFactory.getLogger(StarRocksStreamLoadVisitor.class); - - private final StarRocksWriterOptions writerOptions; - private int pos; - - public StarRocksStreamLoadVisitor(StarRocksWriterOptions writerOptions) { - this.writerOptions = writerOptions; - } - - public void doStreamLoad(StarRocksFlushTuple flushData) throws IOException { - String host = getAvailableHost(); - if (null == host) { - throw new IOException("None of the host in `load_url` could be connected."); - } - String loadUrl = new StringBuilder(host) - .append("/api/") - .append(writerOptions.getDatabase()) - .append("/") - .append(writerOptions.getTable()) - .append("/_stream_load") - .toString(); - LOG.debug(String.format("Start to join batch data: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); - Map loadResult = doHttpPut(loadUrl, flushData.getLabel(), joinRows(flushData.getRows(), flushData.getBytes().intValue())); - final String keyStatus = "Status"; - if (null == loadResult || !loadResult.containsKey(keyStatus)) { - throw new IOException("Unable to flush data to StarRocks: unknown result status."); - } - LOG.debug(new StringBuilder("StreamLoad response:\n").append(JSON.toJSONString(loadResult)).toString()); - if (loadResult.get(keyStatus).equals("Fail")) { - throw new IOException( - new StringBuilder("Failed to flush data to StarRocks.\n").append(JSON.toJSONString(loadResult)).toString() - ); - } - } - - private String getAvailableHost() { - List hostList = writerOptions.getLoadUrlList(); - if (pos >= hostList.size()) { - pos = 0; - } - for (; pos < hostList.size(); pos++) { - String host = new StringBuilder("http://").append(hostList.get(pos)).toString(); - if (tryHttpConnection(host)) { - return host; - } - } - return null; - } - - private boolean tryHttpConnection(String host) { - try { - URL url = new URL(host); - HttpURLConnection co = (HttpURLConnection) url.openConnection(); - co.setConnectTimeout(1000); - co.connect(); - co.disconnect(); - return true; - } catch (Exception e1) { - LOG.warn("Failed to connect to address:{}", host, e1); - return false; - } - } - - private byte[] joinRows(List rows, int totalBytes) { - if (StarRocksWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) { - Map props = writerOptions.getLoadProps(); - byte[] lineDelimiter = StarRocksDelimiterParser.parse(String.valueOf(props.get("row_delimiter")), "\n").getBytes(StandardCharsets.UTF_8); - ByteBuffer bos = ByteBuffer.allocate(totalBytes + rows.size() * lineDelimiter.length); - for (String row : rows) { - bos.put(row.getBytes(StandardCharsets.UTF_8)); - bos.put(lineDelimiter); - } - return bos.array(); - } - - if (StarRocksWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) { - ByteBuffer bos = ByteBuffer.allocate(totalBytes + (rows.isEmpty() ? 2 : rows.size() + 1)); - bos.put("[".getBytes(StandardCharsets.UTF_8)); - byte[] jsonDelimiter = ",".getBytes(StandardCharsets.UTF_8); - boolean isFirstElement = true; - for (String row : rows) { - if (!isFirstElement) { - bos.put(jsonDelimiter); - } - bos.put(row.getBytes(StandardCharsets.UTF_8)); - isFirstElement = false; - } - bos.put("]".getBytes(StandardCharsets.UTF_8)); - return bos.array(); - } - throw new RuntimeException("Failed to join rows data, unsupported `format` from stream load properties:"); - } - - @SuppressWarnings("unchecked") - private Map doHttpPut(String loadUrl, String label, byte[] data) throws IOException { - LOG.info(String.format("Executing stream load to: '%s', size: '%s'", loadUrl, data.length)); - final HttpClientBuilder httpClientBuilder = HttpClients.custom() - .setRedirectStrategy(new DefaultRedirectStrategy() { - @Override - protected boolean isRedirectable(String method) { - return true; - } - }); - try (CloseableHttpClient httpclient = httpClientBuilder.build()) { - HttpPut httpPut = new HttpPut(loadUrl); - List cols = writerOptions.getColumns(); - if (null != cols && !cols.isEmpty()) { - httpPut.setHeader("columns", String.join(",", cols)); - } - if (null != writerOptions.getLoadProps()) { - for (Map.Entry entry : writerOptions.getLoadProps().entrySet()) { - httpPut.setHeader(entry.getKey(), String.valueOf(entry.getValue())); - } - } - httpPut.setHeader("Expect", "100-continue"); - httpPut.setHeader("label", label); - httpPut.setHeader("Content-Type", "application/x-www-form-urlencoded"); - httpPut.setHeader("Authorization", getBasicAuthHeader(writerOptions.getUsername(), writerOptions.getPassword())); - httpPut.setEntity(new ByteArrayEntity(data)); - httpPut.setConfig(RequestConfig.custom().setRedirectsEnabled(true).build()); - try (CloseableHttpResponse resp = httpclient.execute(httpPut)) { - int code = resp.getStatusLine().getStatusCode(); - if (200 != code) { - LOG.warn("Request failed with code:{}", code); - return null; - } - HttpEntity respEntity = resp.getEntity(); - if (null == respEntity) { - LOG.warn("Request failed with empty response."); - return null; - } - return (Map)JSON.parse(EntityUtils.toString(respEntity)); - } - } - } - - private String getBasicAuthHeader(String username, String password) { - String auth = username + ":" + password; - byte[] encodedAuth = Base64.encodeBase64(auth.getBytes()); - return new StringBuilder("Basic ").append(new String(encodedAuth)).toString(); - } - -} diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/manager/StarRocksWriterManager.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/manager/StarRocksWriterManager.java deleted file mode 100644 index 4e53adeb..00000000 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/manager/StarRocksWriterManager.java +++ /dev/null @@ -1,144 +0,0 @@ -package com.starrocks.connector.datax.plugin.writer.starrockswriter.manager; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.io.IOException; -import java.util.ArrayList; -import java.util.List; -import java.util.UUID; -import java.util.concurrent.LinkedBlockingDeque; - -import com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriterOptions; -import com.google.common.base.Strings; - -public class StarRocksWriterManager { - - private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriterManager.class); - - private final StarRocksStreamLoadVisitor starrocksStreamLoadVisitor; - private final StarRocksWriterOptions writerOptions; - - private final List buffer = new ArrayList<>(); - private int batchCount = 0; - private long batchSize = 0; - private volatile boolean closed = false; - private volatile Exception flushException; - private final LinkedBlockingDeque flushQueue; - - public StarRocksWriterManager(StarRocksWriterOptions writerOptions) { - this.writerOptions = writerOptions; - this.starrocksStreamLoadVisitor = new StarRocksStreamLoadVisitor(writerOptions); - flushQueue = new LinkedBlockingDeque<>(writerOptions.getFlushQueueLength()); - this.startAsyncFlushing(); - } - - public final synchronized void writeRecord(String record) throws IOException { - checkFlushException(); - try { - buffer.add(record); - batchCount++; - batchSize += record.getBytes().length; - if (batchCount >= writerOptions.getBatchRows() || batchSize >= writerOptions.getBatchSize()) { - String label = createBatchLabel(); - LOG.debug(String.format("StarRocks buffer Sinking triggered: rows[%d] label[%s].", batchCount, label)); - flush(label, false); - } - } catch (Exception e) { - throw new IOException("Writing records to StarRocks failed.", e); - } - } - - public synchronized void flush(String label, boolean waitUtilDone) throws Exception { - checkFlushException(); - if (batchCount == 0) { - if (waitUtilDone) { - waitAsyncFlushingDone(); - } - return; - } - flushQueue.put(new StarRocksFlushTuple(label, batchSize, new ArrayList<>(buffer))); - if (waitUtilDone) { - // wait the last flush - waitAsyncFlushingDone(); - } - buffer.clear(); - batchCount = 0; - batchSize = 0; - } - - public synchronized void close() { - if (!closed) { - closed = true; - try { - String label = createBatchLabel(); - if (batchCount > 0) LOG.debug(String.format("StarRocks Sink is about to close: label[%s].", label)); - flush(label, true); - } catch (Exception e) { - throw new RuntimeException("Writing records to StarRocks failed.", e); - } - } - checkFlushException(); - } - - public String createBatchLabel() { - return UUID.randomUUID().toString(); - } - - private void startAsyncFlushing() { - // start flush thread - Thread flushThread = new Thread(new Runnable(){ - public void run() { - while(true) { - try { - asyncFlush(); - } catch (Exception e) { - flushException = e; - } - } - } - }); - flushThread.setDaemon(true); - flushThread.start(); - } - - private void waitAsyncFlushingDone() throws InterruptedException { - // wait previous flushings - for (int i = 0; i <= writerOptions.getFlushQueueLength(); i++) { - flushQueue.put(new StarRocksFlushTuple("", 0l, null)); - } - } - - private void asyncFlush() throws Exception { - StarRocksFlushTuple flushData = flushQueue.take(); - if (Strings.isNullOrEmpty(flushData.getLabel())) { - return; - } - LOG.debug(String.format("Async stream load: rows[%d] bytes[%d] label[%s].", flushData.getRows().size(), flushData.getBytes(), flushData.getLabel())); - for (int i = 0; i <= writerOptions.getMaxRetries(); i++) { - try { - // flush to StarRocks with stream load - starrocksStreamLoadVisitor.doStreamLoad(flushData); - LOG.info(String.format("Async stream load finished: label[%s].", flushData.getLabel())); - break; - } catch (Exception e) { - LOG.warn("Failed to flush batch data to StarRocks, retry times = {}", i, e); - if (i >= writerOptions.getMaxRetries()) { - throw new IOException(e); - } - try { - Thread.sleep(1000l * (i + 1)); - } catch (InterruptedException ex) { - Thread.currentThread().interrupt(); - throw new IOException("Unable to flush, interrupted while doing another attempt", e); - } - } - } - } - - private void checkFlushException() { - if (flushException != null) { - throw new RuntimeException("Writing records to StarRocks failed.", flushException); - } - } -} diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksBaseSerializer.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksBaseSerializer.java deleted file mode 100644 index 77d25f12..00000000 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksBaseSerializer.java +++ /dev/null @@ -1,18 +0,0 @@ -package com.starrocks.connector.datax.plugin.writer.starrockswriter.row; - -import com.alibaba.datax.common.element.Column; -import com.alibaba.datax.common.element.Column.Type; - -public class StarRocksBaseSerializer { - - protected String fieldConvertion(Column col) { - if (null == col.getRawData()) { - return null; - } - if (Type.BOOL == col.getType()) { - return String.valueOf(col.asLong()); - } - return col.asString(); - } - -} diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksCsvSerializer.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksCsvSerializer.java deleted file mode 100644 index 1366d570..00000000 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksCsvSerializer.java +++ /dev/null @@ -1,32 +0,0 @@ -package com.starrocks.connector.datax.plugin.writer.starrockswriter.row; - -import java.io.StringWriter; - -import com.alibaba.datax.common.element.Record; - -import com.google.common.base.Strings; - -public class StarRocksCsvSerializer extends StarRocksBaseSerializer implements StarRocksISerializer { - - private static final long serialVersionUID = 1L; - - private final String columnSeparator; - - public StarRocksCsvSerializer(String sp) { - this.columnSeparator = StarRocksDelimiterParser.parse(sp, "\t"); - } - - @Override - public String serialize(Record row) { - StringBuilder sb = new StringBuilder(); - for (int i = 0; i < row.getColumnNumber(); i++) { - String value = fieldConvertion(row.getColumn(i)); - sb.append(null == value ? "\\N" : value); - if (i < row.getColumnNumber() - 1) { - sb.append(columnSeparator); - } - } - return sb.toString(); - } - -} diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksDelimiterParser.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksDelimiterParser.java deleted file mode 100644 index 04301e0f..00000000 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksDelimiterParser.java +++ /dev/null @@ -1,55 +0,0 @@ -package com.starrocks.connector.datax.plugin.writer.starrockswriter.row; - -import java.io.StringWriter; - -import com.google.common.base.Strings; - -public class StarRocksDelimiterParser { - - private static final String HEX_STRING = "0123456789ABCDEF"; - - public static String parse(String sp, String dSp) throws RuntimeException { - if (Strings.isNullOrEmpty(sp)) { - return dSp; - } - if (!sp.toUpperCase().startsWith("\\X")) { - return sp; - } - String hexStr = sp.substring(2); - // check hex str - if (hexStr.isEmpty()) { - throw new RuntimeException("Failed to parse delimiter: `Hex str is empty`"); - } - if (hexStr.length() % 2 != 0) { - throw new RuntimeException("Failed to parse delimiter: `Hex str length error`"); - } - for (char hexChar : hexStr.toUpperCase().toCharArray()) { - if (HEX_STRING.indexOf(hexChar) == -1) { - throw new RuntimeException("Failed to parse delimiter: `Hex str format error`"); - } - } - // transform to separator - StringWriter writer = new StringWriter(); - for (byte b : hexStrToBytes(hexStr)) { - writer.append((char) b); - } - return writer.toString(); - } - - private static byte[] hexStrToBytes(String hexStr) { - String upperHexStr = hexStr.toUpperCase(); - int length = upperHexStr.length() / 2; - char[] hexChars = upperHexStr.toCharArray(); - byte[] bytes = new byte[length]; - for (int i = 0; i < length; i++) { - int pos = i * 2; - bytes[i] = (byte) (charToByte(hexChars[pos]) << 4 | charToByte(hexChars[pos + 1])); - } - return bytes; - } - - private static byte charToByte(char c) { - return (byte) HEX_STRING.indexOf(c); - } - -} diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksISerializer.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksISerializer.java deleted file mode 100644 index 7bcb8973..00000000 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksISerializer.java +++ /dev/null @@ -1,11 +0,0 @@ -package com.starrocks.connector.datax.plugin.writer.starrockswriter.row; - -import java.io.Serializable; - -import com.alibaba.datax.common.element.Record; - -public interface StarRocksISerializer extends Serializable { - - String serialize(Record row); - -} diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksJsonSerializer.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksJsonSerializer.java deleted file mode 100644 index 60faa1be..00000000 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksJsonSerializer.java +++ /dev/null @@ -1,34 +0,0 @@ -package com.starrocks.connector.datax.plugin.writer.starrockswriter.row; - -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import com.alibaba.datax.common.element.Record; -import com.alibaba.fastjson.JSON; - -public class StarRocksJsonSerializer extends StarRocksBaseSerializer implements StarRocksISerializer { - - private static final long serialVersionUID = 1L; - - private final List fieldNames; - - public StarRocksJsonSerializer(List fieldNames) { - this.fieldNames = fieldNames; - } - - @Override - public String serialize(Record row) { - if (null == fieldNames) { - return ""; - } - Map rowMap = new HashMap<>(fieldNames.size()); - int idx = 0; - for (String fieldName : fieldNames) { - rowMap.put(fieldName, fieldConvertion(row.getColumn(idx))); - idx++; - } - return JSON.toJSONString(rowMap); - } - -} diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksSerializerFactory.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksSerializerFactory.java deleted file mode 100644 index 85f446cd..00000000 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/row/StarRocksSerializerFactory.java +++ /dev/null @@ -1,22 +0,0 @@ -package com.starrocks.connector.datax.plugin.writer.starrockswriter.row; - -import java.util.Map; - -import com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriterOptions; - -public class StarRocksSerializerFactory { - - private StarRocksSerializerFactory() {} - - public static StarRocksISerializer createSerializer(StarRocksWriterOptions writerOptions) { - if (StarRocksWriterOptions.StreamLoadFormat.CSV.equals(writerOptions.getStreamLoadFormat())) { - Map props = writerOptions.getLoadProps(); - return new StarRocksCsvSerializer(null == props || !props.containsKey("column_separator") ? null : String.valueOf(props.get("column_separator"))); - } - if (StarRocksWriterOptions.StreamLoadFormat.JSON.equals(writerOptions.getStreamLoadFormat())) { - return new StarRocksJsonSerializer(writerOptions.getColumns()); - } - throw new RuntimeException("Failed to create row serializer, unsupported `format` from stream load properties."); - } - -} diff --git a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/util/StarRocksWriterUtil.java b/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/util/StarRocksWriterUtil.java deleted file mode 100755 index c3b5d8d1..00000000 --- a/starrockswriter/src/main/java/com/starrocks/connector/datax/plugin/writer/doriswriter/util/StarRocksWriterUtil.java +++ /dev/null @@ -1,83 +0,0 @@ -package com.starrocks.connector.datax.plugin.writer.starrockswriter.util; - -import com.alibaba.datax.plugin.rdbms.util.DBUtil; -import com.alibaba.datax.plugin.rdbms.util.DataBaseType; -import com.alibaba.datax.plugin.rdbms.util.RdbmsException; -import com.alibaba.datax.plugin.rdbms.writer.Constant; -import com.alibaba.druid.sql.parser.ParserException; -import com.starrocks.connector.datax.plugin.writer.starrockswriter.StarRocksWriterOptions; -import com.google.common.base.Strings; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.sql.Connection; -import java.sql.Statement; -import java.util.*; - -public final class StarRocksWriterUtil { - private static final Logger LOG = LoggerFactory.getLogger(StarRocksWriterUtil.class); - - private StarRocksWriterUtil() {} - - public static List renderPreOrPostSqls(List preOrPostSqls, String tableName) { - if (null == preOrPostSqls) { - return Collections.emptyList(); - } - List renderedSqls = new ArrayList<>(); - for (String sql : preOrPostSqls) { - if (!Strings.isNullOrEmpty(sql)) { - renderedSqls.add(sql.replace(Constant.TABLE_NAME_PLACEHOLDER, tableName)); - } - } - return renderedSqls; - } - - public static void executeSqls(Connection conn, List sqls) { - Statement stmt = null; - String currentSql = null; - try { - stmt = conn.createStatement(); - for (String sql : sqls) { - currentSql = sql; - DBUtil.executeSqlWithoutResultSet(stmt, sql); - } - } catch (Exception e) { - throw RdbmsException.asQueryException(DataBaseType.MySql, e, currentSql, null, null); - } finally { - DBUtil.closeDBResources(null, stmt, null); - } - } - - public static void preCheckPrePareSQL(StarRocksWriterOptions options) { - String table = options.getTable(); - List preSqls = options.getPreSqlList(); - List renderedPreSqls = StarRocksWriterUtil.renderPreOrPostSqls(preSqls, table); - if (null != renderedPreSqls && !renderedPreSqls.isEmpty()) { - LOG.info("Begin to preCheck preSqls:[{}].", String.join(";", renderedPreSqls)); - for (String sql : renderedPreSqls) { - try { - DBUtil.sqlValid(sql, DataBaseType.MySql); - } catch (ParserException e) { - throw RdbmsException.asPreSQLParserException(DataBaseType.MySql,e,sql); - } - } - } - } - - public static void preCheckPostSQL(StarRocksWriterOptions options) { - String table = options.getTable(); - List postSqls = options.getPostSqlList(); - List renderedPostSqls = StarRocksWriterUtil.renderPreOrPostSqls(postSqls, table); - if (null != renderedPostSqls && !renderedPostSqls.isEmpty()) { - LOG.info("Begin to preCheck postSqls:[{}].", String.join(";", renderedPostSqls)); - for(String sql : renderedPostSqls) { - try { - DBUtil.sqlValid(sql, DataBaseType.MySql); - } catch (ParserException e){ - throw RdbmsException.asPostSQLParserException(DataBaseType.MySql,e,sql); - } - } - } - } -}