diff --git a/package.xml b/package.xml index 882dd23b..7daca069 100755 --- a/package.xml +++ b/package.xml @@ -357,6 +357,13 @@ datax + + tsdbreader/target/datax/ + + **/*.* + + datax + adbpgwriter/target/datax/ diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java index e42dedc0..f5069dc9 100644 --- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java @@ -16,6 +16,8 @@ public final class Constant { static final String DEFAULT_DATA_FORMAT = "yyyy-MM-dd HH:mm:ss"; public static final String METRIC_SPECIFY_KEY = "__metric__"; + public static final String METRIC_SPECIFY_KEY_PREFIX = METRIC_SPECIFY_KEY + "."; + public static final int METRIC_SPECIFY_KEY_PREFIX_LENGTH = METRIC_SPECIFY_KEY_PREFIX.length(); public static final String TS_SPECIFY_KEY = "__ts__"; public static final String VALUE_SPECIFY_KEY = "__value__"; diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java index 14ee7e41..c8a3d7ae 100644 --- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java @@ -17,14 +17,19 @@ public class Key { // RDB for MySQL / ADB etc. static final String SINK_DB_TYPE = "sinkDbType"; static final String ENDPOINT = "endpoint"; + static final String USERNAME = "username"; + static final String PASSWORD = "password"; static final String COLUMN = "column"; static final String METRIC = "metric"; static final String FIELD = "field"; static final String TAG = "tag"; + static final String COMBINE = "combine"; static final String INTERVAL_DATE_TIME = "splitIntervalMs"; static final String BEGIN_DATE_TIME = "beginDateTime"; static final String END_DATE_TIME = "endDateTime"; + static final String HINT = "hint"; + static final Boolean COMBINE_DEFAULT_VALUE = false; static final Integer INTERVAL_DATE_TIME_DEFAULT_VALUE = 60; static final String TYPE_DEFAULT_VALUE = "TSDB"; static final Set TYPE_SET = new HashSet<>(); diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java index 04b931c7..550a010a 100755 --- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java @@ -60,6 +60,15 @@ public class TSDBReader extends Reader { "The parameter [" + Key.ENDPOINT + "] is not set."); } + String username = originalConfig.getString(Key.USERNAME, null); + if (StringUtils.isBlank(username)) { + LOG.warn("The parameter [" + Key.USERNAME + "] is blank."); + } + String password = originalConfig.getString(Key.PASSWORD, null); + if (StringUtils.isBlank(password)) { + LOG.warn("The parameter [" + Key.PASSWORD + "] is blank."); + } + // tagK / field could be empty if ("TSDB".equals(type)) { List columns = originalConfig.getList(Key.COLUMN, String.class); @@ -76,7 +85,14 @@ public class TSDBReader extends Reader { "The parameter [" + Key.COLUMN + "] is not set."); } for (String specifyKey : Constant.MUST_CONTAINED_SPECIFY_KEYS) { - if (!columns.contains(specifyKey)) { + boolean containSpecifyKey = false; + for (String column : columns) { + if (column.startsWith(specifyKey)) { + containSpecifyKey = true; + break; + } + } + if (!containSpecifyKey) { throw DataXException.asDataXException( TSDBReaderErrorCode.ILLEGAL_VALUE, "The parameter [" + Key.COLUMN + "] should contain " @@ -99,6 +115,8 @@ public class TSDBReader extends Reader { "The parameter [" + Key.INTERVAL_DATE_TIME + "] should be great than zero."); } + Boolean isCombine = originalConfig.getBool(Key.COMBINE, Key.COMBINE_DEFAULT_VALUE); + SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT); String startTime = originalConfig.getString(Key.BEGIN_DATE_TIME); Long startDate; @@ -168,14 +186,14 @@ public class TSDBReader extends Reader { startTime = format.parse(originalConfig.getString(Key.BEGIN_DATE_TIME)).getTime(); } catch (ParseException e) { throw DataXException.asDataXException( - TSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.BEGIN_DATE_TIME + "]失败.", e); + TSDBReaderErrorCode.ILLEGAL_VALUE, "Analysis [" + Key.BEGIN_DATE_TIME + "] failed.", e); } long endTime; try { endTime = format.parse(originalConfig.getString(Key.END_DATE_TIME)).getTime(); } catch (ParseException e) { throw DataXException.asDataXException( - TSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.END_DATE_TIME + "]失败.", e); + TSDBReaderErrorCode.ILLEGAL_VALUE, "Analysis [" + Key.END_DATE_TIME + "] failed.", e); } if (TimeUtils.isSecond(startTime)) { startTime *= 1000; @@ -186,13 +204,14 @@ public class TSDBReader extends Reader { DateTime startDateTime = new DateTime(TimeUtils.getTimeInHour(startTime)); DateTime endDateTime = new DateTime(TimeUtils.getTimeInHour(endTime)); + final Boolean isCombine = originalConfig.getBool(Key.COMBINE, Key.COMBINE_DEFAULT_VALUE); + if ("TSDB".equals(type)) { - // split by metric - for (String column : columns4TSDB) { + if (isCombine) { // split by time in hour while (startDateTime.isBefore(endDateTime)) { Configuration clone = this.originalConfig.clone(); - clone.set(Key.COLUMN, Collections.singletonList(column)); + clone.set(Key.COLUMN, columns4TSDB); clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis()); startDateTime = startDateTime.plusMillis(splitIntervalMs); @@ -202,15 +221,30 @@ public class TSDBReader extends Reader { LOG.info("Configuration: {}", JSON.toJSONString(clone)); } + } else { + // split by time in hour + while (startDateTime.isBefore(endDateTime)) { + // split by metric + for (String column : columns4TSDB) { + Configuration clone = this.originalConfig.clone(); + clone.set(Key.COLUMN, Collections.singletonList(column)); + + clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis()); + startDateTime = startDateTime.plusMillis(splitIntervalMs); + // Make sure the time interval is [start, end). + clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1); + configurations.add(clone); + + LOG.info("Configuration: {}", JSON.toJSONString(clone)); + } + } } } else { - // split by metric - for (String metric : metrics) { - // split by time in hour + if (isCombine) { while (startDateTime.isBefore(endDateTime)) { Configuration clone = this.originalConfig.clone(); clone.set(Key.COLUMN, columns4RDB); - clone.set(Key.METRIC, Collections.singletonList(metric)); + clone.set(Key.METRIC, metrics); clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis()); startDateTime = startDateTime.plusMillis(splitIntervalMs); @@ -220,6 +254,24 @@ public class TSDBReader extends Reader { LOG.info("Configuration: {}", JSON.toJSONString(clone)); } + } else { + // split by time in hour + while (startDateTime.isBefore(endDateTime)) { + // split by metric + for (String metric : metrics) { + Configuration clone = this.originalConfig.clone(); + clone.set(Key.COLUMN, columns4RDB); + clone.set(Key.METRIC, Collections.singletonList(metric)); + + clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis()); + startDateTime = startDateTime.plusMillis(splitIntervalMs); + // Make sure the time interval is [start, end). + clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1); + configurations.add(clone); + + LOG.info("Configuration: {}", JSON.toJSONString(clone)); + } + } } } return configurations; @@ -247,6 +299,8 @@ public class TSDBReader extends Reader { private TSDBConnection conn; private Long startTime; private Long endTime; + private Boolean isCombine; + private Map hint; @Override public void init() { @@ -265,11 +319,16 @@ public class TSDBReader extends Reader { this.tags = readerSliceConfig.getMap(Key.TAG); String address = readerSliceConfig.getString(Key.ENDPOINT); + String username = readerSliceConfig.getString(Key.USERNAME); + String password = readerSliceConfig.getString(Key.PASSWORD); - conn = new TSDBConnection(address); + conn = new TSDBConnection(address, username, password); this.startTime = readerSliceConfig.getLong(Key.BEGIN_DATE_TIME); this.endTime = readerSliceConfig.getLong(Key.END_DATE_TIME); + + this.isCombine = readerSliceConfig.getBool(Key.COMBINE, Key.COMBINE_DEFAULT_VALUE); + this.hint = readerSliceConfig.getMap(Key.HINT); } @Override @@ -283,29 +342,35 @@ public class TSDBReader extends Reader { if ("TSDB".equals(type)) { for (String metric : columns4TSDB) { final Map tags = this.tags == null ? - null : (Map) this.tags.get(metric); + null : (Map) this.tags.get(metric); if (fields == null || !fields.containsKey(metric)) { - conn.sendDPs(metric, tags, this.startTime, this.endTime, recordSender); + conn.sendDPs(metric, tags, this.startTime, this.endTime, recordSender, hint); } else { conn.sendDPs(metric, (List) fields.get(metric), - tags, this.startTime, this.endTime, recordSender); + tags, this.startTime, this.endTime, recordSender, hint); } } } else { - for (String metric : metrics) { + if (isCombine) { final Map tags = this.tags == null ? - null : (Map) this.tags.get(metric); - if (fields == null || !fields.containsKey(metric)) { - conn.sendRecords(metric, tags, startTime, endTime, columns4RDB, recordSender); - } else { - conn.sendRecords(metric, (List) fields.get(metric), - tags, startTime, endTime, columns4RDB, recordSender); + null : (Map) this.tags.get(metrics.get(0)); + conn.sendRecords(metrics, tags, startTime, endTime, columns4RDB, recordSender, hint); + } else { + for (String metric : metrics) { + final Map tags = this.tags == null ? + null : (Map) this.tags.get(metric); + if (fields == null || !fields.containsKey(metric)) { + conn.sendRecords(metric, tags, startTime, endTime, columns4RDB, isCombine, recordSender, hint); + } else { + conn.sendRecords(metric, (List) fields.get(metric), + tags, startTime, endTime, columns4RDB, recordSender, hint); + } } } } } catch (Exception e) { throw DataXException.asDataXException( - TSDBReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e); + TSDBReaderErrorCode.ILLEGAL_VALUE, "Error in getting or sending data point!", e); } } diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java index 500894bb..96cb7f9d 100644 --- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java @@ -22,6 +22,20 @@ public interface Connection4TSDB { */ String address(); + /** + * Get the address of Database. + * + * @return host+ip + */ + String username(); + + /** + * Get the address of Database. + * + * @return host+ip + */ + String password(); + /** * Get the version of Database. * @@ -46,22 +60,27 @@ public interface Connection4TSDB { /** * Send data points for TSDB with single field. */ - void sendDPs(String metric, Map tags, Long start, Long end, RecordSender recordSender) throws Exception; + void sendDPs(String metric, Map tags, Long start, Long end, RecordSender recordSender, Map hint) throws Exception; /** * Send data points for TSDB with multi fields. */ - void sendDPs(String metric, List fields, Map tags, Long start, Long end, RecordSender recordSender) throws Exception; + void sendDPs(String metric, List fields, Map tags, Long start, Long end, RecordSender recordSender, Map hint) throws Exception; /** * Send data points for RDB with single field. */ - void sendRecords(String metric, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception; + void sendRecords(String metric, Map tags, Long start, Long end, List columns4RDB, Boolean isCombine, RecordSender recordSender, Map hint) throws Exception; /** * Send data points for RDB with multi fields. */ - void sendRecords(String metric, List fields, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception; + void sendRecords(String metric, List fields, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender, Map hint) throws Exception; + + /** + * Send data points for RDB with single fields on combine mode. + */ + void sendRecords(List metrics, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender, Map hint) throws Exception; /** * Put data point. diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java index 5426ab49..d466da39 100644 --- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java @@ -19,9 +19,13 @@ import java.util.Map; public class TSDBConnection implements Connection4TSDB { private String address; + private String username; + private String password; - public TSDBConnection(String address) { + public TSDBConnection(String address, String username, String password) { this.address = address; + this.username = username; + this.password = password; } @Override @@ -29,14 +33,24 @@ public class TSDBConnection implements Connection4TSDB { return address; } + @Override + public String username() { + return username; + } + + @Override + public String password() { + return password; + } + @Override public String version() { - return TSDBUtils.version(address); + return TSDBUtils.version(address, username, password); } @Override public String config() { - return TSDBUtils.config(address); + return TSDBUtils.config(address, username, password); } @Override @@ -45,23 +59,28 @@ public class TSDBConnection implements Connection4TSDB { } @Override - public void sendDPs(String metric, Map tags, Long start, Long end, RecordSender recordSender) throws Exception { - TSDBDump.dump4TSDB(this, metric, tags, start, end, recordSender); + public void sendDPs(String metric, Map tags, Long start, Long end, RecordSender recordSender, Map hint) throws Exception { + TSDBDump.dump4TSDB(this, metric, tags, start, end, recordSender, hint); } @Override - public void sendDPs(String metric, List fields, Map tags, Long start, Long end, RecordSender recordSender) throws Exception { - TSDBDump.dump4TSDB(this, metric, fields, tags, start, end, recordSender); + public void sendDPs(String metric, List fields, Map tags, Long start, Long end, RecordSender recordSender, Map hint) throws Exception { + TSDBDump.dump4TSDB(this, metric, fields, tags, start, end, recordSender, hint); } @Override - public void sendRecords(String metric, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception { - TSDBDump.dump4RDB(this, metric, tags, start, end, columns4RDB, recordSender); + public void sendRecords(String metric, Map tags, Long start, Long end, List columns4RDB, Boolean isCombine, RecordSender recordSender, Map hint) throws Exception { + TSDBDump.dump4RDB(this, metric, tags, start, end, columns4RDB, recordSender, hint); } @Override - public void sendRecords(String metric, List fields, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception { - TSDBDump.dump4RDB(this, metric, fields, tags, start, end, columns4RDB, recordSender); + public void sendRecords(List metrics, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender, Map hint) throws Exception { + TSDBDump.dump4RDB(this, metrics, tags, start, end, columns4RDB, recordSender, hint); + } + + @Override + public void sendRecords(String metric, List fields, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender, Map hint) throws Exception { + TSDBDump.dump4RDB(this, metric, fields, tags, start, end, columns4RDB, recordSender, hint); } @Override diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java index 8bae3a70..c911a062 100644 --- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java @@ -9,10 +9,9 @@ import com.alibaba.fastjson.parser.Feature; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.HashMap; -import java.util.LinkedList; -import java.util.List; -import java.util.Map; +import java.util.*; + +import static com.alibaba.datax.plugin.reader.tsdbreader.Constant.METRIC_SPECIFY_KEY_PREFIX_LENGTH; /** * Copyright @ 2019 alibaba.com @@ -37,10 +36,10 @@ final class TSDBDump { } static void dump4TSDB(TSDBConnection conn, String metric, Map tags, - Long start, Long end, RecordSender sender) throws Exception { + Long start, Long end, RecordSender sender, Map hint) throws Exception { LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end); - String res = queryRange4SingleField(conn, metric, tags, start, end); + String res = queryRange4SingleField(conn, metric, tags, start, end, hint); List dps = getDps4TSDB(metric, res); if (dps == null || dps.isEmpty()) { return; @@ -49,10 +48,10 @@ final class TSDBDump { } static void dump4TSDB(TSDBConnection conn, String metric, List fields, Map tags, - Long start, Long end, RecordSender sender) throws Exception { + Long start, Long end, RecordSender sender, Map hint) throws Exception { LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end); - String res = queryRange4MultiFields(conn, metric, fields, tags, start, end); + String res = queryRange4MultiFields(conn, metric, fields, tags, start, end, hint); List dps = getDps4TSDB(metric, fields, res); if (dps == null || dps.isEmpty()) { return; @@ -61,10 +60,10 @@ final class TSDBDump { } static void dump4RDB(TSDBConnection conn, String metric, Map tags, - Long start, Long end, List columns4RDB, RecordSender sender) throws Exception { + Long start, Long end, List columns4RDB, RecordSender sender, Map hint) throws Exception { LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end); - String res = queryRange4SingleField(conn, metric, tags, start, end); + String res = queryRange4SingleField(conn, metric, tags, start, end, hint); List dps = getDps4RDB(metric, res); if (dps == null || dps.isEmpty()) { return; @@ -92,12 +91,71 @@ final class TSDBDump { } } + public static void dump4RDB(TSDBConnection conn, List metrics, Map tags, Long start, Long end, List columns4RDB, RecordSender sender, Map hint) throws Exception { + LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metrics, start, end); + + List dps = new LinkedList<>(); + for (String metric : metrics) { + String res = queryRange4SingleField(conn, metric, tags, start, end, hint); + final List dpList = getDps4RDB(metric, res); + if (dpList == null || dpList.isEmpty()) { + continue; + } + dps.addAll(dpList); + } + if (dps.isEmpty()) { + return; + } + Map> dpsCombinedByTs = new LinkedHashMap<>(); + for (DataPoint4TSDB dp : dps) { + final long ts = dp.getTimestamp(); + final Map dpsWithSameTs = dpsCombinedByTs.computeIfAbsent(ts, k -> new LinkedHashMap<>()); + dpsWithSameTs.put(dp.getMetric(), dp); + } + + for (Map.Entry> entry : dpsCombinedByTs.entrySet()) { + final Long ts = entry.getKey(); + final Map metricAndDps = entry.getValue(); + final Record record = sender.createRecord(); + DataPoint4TSDB tmpDp = null; + + for (final String column : columns4RDB) { + if (column.startsWith(Constant.METRIC_SPECIFY_KEY)) { + final String m = column.substring(METRIC_SPECIFY_KEY_PREFIX_LENGTH); + tmpDp = metricAndDps.get(m); + if (tmpDp == null) { + continue; + } + record.addColumn(getColumn(tmpDp.getValue())); + } else if (Constant.TS_SPECIFY_KEY.equals(column)) { + record.addColumn(new LongColumn(ts)); + } else if (Constant.VALUE_SPECIFY_KEY.equals(column)) { + // combine 模式下,不应该定义 __value__ 字段,因为 __metric__.xxx 字段会输出对应的 value 值 + throw new RuntimeException("The " + Constant.VALUE_SPECIFY_KEY + + " column should not be specified in combine mode!"); + } else { + // combine 模式下,应该确保 __metric__.xxx 字段的定义,放在 column 数组的最前面,以保证获取到 metric + if (tmpDp == null) { + throw new RuntimeException("These " + Constant.METRIC_SPECIFY_KEY_PREFIX + + " column should be placed first in the column array in combine mode!"); + } + final Object tagv = tmpDp.getTags().get(column); + if (tagv == null) { + continue; + } + record.addColumn(getColumn(tagv)); + } + } + sender.sendToWriter(record); + } + } + static void dump4RDB(TSDBConnection conn, String metric, List fields, Map tags, Long start, Long end, - List columns4RDB, RecordSender sender) throws Exception { + List columns4RDB, RecordSender sender, Map hint) throws Exception { LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end); - String res = queryRange4MultiFields(conn, metric, fields, tags, start, end); + String res = queryRange4MultiFields(conn, metric, fields, tags, start, end, hint); List dps = getDps4RDB(metric, fields, res); if (dps == null || dps.isEmpty()) { return; @@ -131,14 +189,16 @@ final class TSDBDump { valueColumn = new LongColumn((Long) value); } else if (value instanceof String) { valueColumn = new StringColumn((String) value); + } else if (value instanceof Integer) { + valueColumn = new LongColumn(((Integer)value).longValue()); } else { - throw new Exception(String.format("value 不支持类型: [%s]", value.getClass().getSimpleName())); + throw new Exception(String.format("value not supported type: [%s]", value.getClass().getSimpleName())); } return valueColumn; } private static String queryRange4SingleField(TSDBConnection conn, String metric, Map tags, - Long start, Long end) throws Exception { + Long start, Long end, Map hint) throws Exception { String tagKV = getFilterByTags(tags); String body = "{\n" + " \"start\": " + start + ",\n" + @@ -148,14 +208,15 @@ final class TSDBDump { " \"aggregator\": \"none\",\n" + " \"metric\": \"" + metric + "\"\n" + (tagKV == null ? "" : tagKV) + + (hint == null ? "" : (", \"hint\": " + JSON.toJSONString(hint))) + " }\n" + " ]\n" + "}"; - return HttpUtils.post(conn.address() + QUERY, body); + return HttpUtils.post(conn.address() + QUERY, conn.username(), conn.password(), body); } private static String queryRange4MultiFields(TSDBConnection conn, String metric, List fields, - Map tags, Long start, Long end) throws Exception { + Map tags, Long start, Long end, Map hint) throws Exception { // fields StringBuilder fieldBuilder = new StringBuilder(); fieldBuilder.append("\"fields\":["); @@ -177,10 +238,11 @@ final class TSDBDump { " \"metric\": \"" + metric + "\",\n" + fieldBuilder.toString() + (tagKV == null ? "" : tagKV) + + (hint == null ? "" : (", \"hint\": " + JSON.toJSONString(hint))) + " }\n" + " ]\n" + "}"; - return HttpUtils.post(conn.address() + QUERY_MULTI_FIELD, body); + return HttpUtils.post(conn.address() + QUERY_MULTI_FIELD, conn.username(), conn.password(), body); } private static String getFilterByTags(Map tags) { diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtils.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtils.java index 3e0be854..5cba4e54 100644 --- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtils.java +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtils.java @@ -1,11 +1,13 @@ package com.alibaba.datax.plugin.reader.tsdbreader.util; import com.alibaba.fastjson.JSON; +import org.apache.commons.lang3.StringUtils; import org.apache.http.client.fluent.Content; import org.apache.http.client.fluent.Request; import org.apache.http.entity.ContentType; import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -22,13 +24,18 @@ public final class HttpUtils { public final static int CONNECT_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60); public final static int SOCKET_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60); + private static final String CREDENTIALS_FORMAT = "%s:%s"; + private static final String BASIC_AUTHENTICATION_FORMAT = "Basic %s"; + private HttpUtils() { } - public static String get(String url) throws Exception { - Content content = Request.Get(url) + public static String get(String url, String username, String password) throws Exception { + final Request request = Request.Get(url) .connectTimeout(CONNECT_TIMEOUT_DEFAULT_IN_MILL) - .socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL) + .socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL); + addAuth(request, username, password); + Content content = request .execute() .returnContent(); if (content == null) { @@ -37,24 +44,21 @@ public final class HttpUtils { return content.asString(StandardCharsets.UTF_8); } - public static String post(String url, Map params) throws Exception { - return post(url, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL); + public static String post(String url, String username, String password, Map params) throws Exception { + return post(url, username, password, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL); } - public static String post(String url, String params) throws Exception { - return post(url, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL); + public static String post(String url, String username, String password, String params) throws Exception { + return post(url, username, password, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL); } - public static String post(String url, Map params, + public static String post(String url, String username, String password, String params, int connectTimeoutInMill, int socketTimeoutInMill) throws Exception { - return post(url, JSON.toJSONString(params), connectTimeoutInMill, socketTimeoutInMill); - } - - public static String post(String url, String params, - int connectTimeoutInMill, int socketTimeoutInMill) throws Exception { - Content content = Request.Post(url) + Request request = Request.Post(url) .connectTimeout(connectTimeoutInMill) - .socketTimeout(socketTimeoutInMill) + .socketTimeout(socketTimeoutInMill); + addAuth(request, username, password); + Content content = request .addHeader("Content-Type", "application/json") .bodyString(params, ContentType.APPLICATION_JSON) .execute() @@ -64,4 +68,20 @@ public final class HttpUtils { } return content.asString(StandardCharsets.UTF_8); } + + private static void addAuth(Request request, String username, String password) { + String authorization = generateHttpAuthorization(username, password); + if (authorization != null) { + request.setHeader("Authorization", authorization); + } + } + + private static String generateHttpAuthorization(String username, String password) { + if (StringUtils.isBlank(username) || StringUtils.isBlank(password)) { + return null; + } + String credentials = String.format(CREDENTIALS_FORMAT, username, password); + credentials = Base64.getEncoder().encodeToString(credentials.getBytes()); + return String.format(BASIC_AUTHENTICATION_FORMAT, credentials); + } } diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TSDBUtils.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TSDBUtils.java index bb7b4b87..d91c3557 100644 --- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TSDBUtils.java +++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TSDBUtils.java @@ -1,11 +1,5 @@ package com.alibaba.datax.plugin.reader.tsdbreader.util; -import com.alibaba.datax.plugin.reader.tsdbreader.conn.DataPoint4TSDB; -import com.alibaba.fastjson.JSON; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import java.util.List; /** * Copyright @ 2019 alibaba.com @@ -17,52 +11,28 @@ import java.util.List; */ public final class TSDBUtils { - private static final Logger LOGGER = LoggerFactory.getLogger(TSDBUtils.class); - private TSDBUtils() { } - public static String version(String address) { + public static String version(String address, String username, String password) { String url = String.format("%s/api/version", address); String rsp; try { - rsp = HttpUtils.get(url); + rsp = HttpUtils.get(url, username, password); } catch (Exception e) { throw new RuntimeException(e); } return rsp; } - public static String config(String address) { + public static String config(String address, String username, String password) { String url = String.format("%s/api/config", address); String rsp; try { - rsp = HttpUtils.get(url); + rsp = HttpUtils.get(url, username, password); } catch (Exception e) { throw new RuntimeException(e); } return rsp; } - - public static boolean put(String address, List dps) { - return put(address, JSON.toJSON(dps)); - } - - public static boolean put(String address, DataPoint4TSDB dp) { - return put(address, JSON.toJSON(dp)); - } - - private static boolean put(String address, Object o) { - String url = String.format("%s/api/put", address); - String rsp; - try { - rsp = HttpUtils.post(url, o.toString()); - // If successful, the returned content should be null. - assert rsp == null; - } catch (Exception e) { - LOGGER.error("Address: {}, DataPoints: {}", url, o); - throw new RuntimeException(e); - } - return true; - } } diff --git a/tsdbwriter/pom.xml b/tsdbwriter/pom.xml index 1fb7c1e0..fd4cc6f5 100644 --- a/tsdbwriter/pom.xml +++ b/tsdbwriter/pom.xml @@ -91,6 +91,13 @@ ${fastjson.version} + + + com.aliyun + hitsdb-client + 0.4.0-SNAPSHOT + + junit diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/Connection4TSDB.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/Connection4TSDB.java index 8119348d..ecb30055 100644 --- a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/Connection4TSDB.java +++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/Connection4TSDB.java @@ -21,6 +21,28 @@ public interface Connection4TSDB { */ String address(); + /** + * Get the setted database name. + * + * @return database + */ + String database(); + + + /** + * Get the username of Database. + * + * @return username + */ + String username(); + + /** + * Get the password of Database. + * + * @return password + */ + String password(); + /** * Get the version of Database. * @@ -69,17 +91,25 @@ public interface Connection4TSDB { boolean put(List dps); /** - * Put data points. + * Put data points with single field. * * @param dps data points * @return whether the data point is written successfully */ boolean put(String dps); + /** + * Put data points with multi fields. + * + * @param dps data points + * @return whether the data point is written successfully + */ + boolean mput(String dps); + /** * Whether current version is supported. * * @return true: supported; false: not yet! */ boolean isSupported(); -} +} \ No newline at end of file diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/TSDBConnection.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/TSDBConnection.java index e4ebad7d..074f0295 100644 --- a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/TSDBConnection.java +++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/TSDBConnection.java @@ -18,12 +18,18 @@ import java.util.List; public class TSDBConnection implements Connection4TSDB { private String address; + private String username; + private String password; + private String database; - public TSDBConnection(String address) { + public TSDBConnection(String address, String database, String username, String password) { if (StringUtils.isBlank(address)) { throw new RuntimeException("TSDBConnection init failed because address is blank!"); } this.address = address; + this.database = database; + this.username = username; + this.password = password; } @Override @@ -31,14 +37,29 @@ public class TSDBConnection implements Connection4TSDB { return address; } + @Override + public String username() { + return username; + } + + @Override + public String database() { + return database; + } + + @Override + public String password() { + return password; + } + @Override public String version() { - return TSDBUtils.version(address); + return TSDBUtils.version(address, username, password); } @Override public String config() { - return TSDBUtils.config(address); + return TSDBUtils.config(address, username, password); } @Override @@ -53,17 +74,22 @@ public class TSDBConnection implements Connection4TSDB { @Override public boolean put(DataPoint4TSDB dp) { - return TSDBUtils.put(address, dp); + return TSDBUtils.put(address, database, username, password, dp); } @Override public boolean put(List dps) { - return TSDBUtils.put(address, dps); + return TSDBUtils.put(address, database, username, password, dps); } @Override public boolean put(String dps) { - return TSDBUtils.put(address, dps); + return TSDBUtils.put(address, database, username, password, dps); + } + + @Override + public boolean mput(String dps) { + return TSDBUtils.mput(address, database, username, password, dps); } @Override diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Key.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Key.java index 2cc3f671..6cb239ec 100755 --- a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Key.java +++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Key.java @@ -10,8 +10,22 @@ package com.alibaba.datax.plugin.writer.tsdbwriter; */ public class Key { + static final String SOURCE_DB_TYPE = "sourceDbType"; + static final String MULTI_FIELD = "multiField"; + + // common static final String ENDPOINT = "endpoint"; + static final String USERNAME = "username"; + static final String PASSWORD = "password"; + static final String IGNORE_WRITE_ERROR = "ignoreWriteError"; + static final String DATABASE = "database"; + + // for tsdb static final String BATCH_SIZE = "batchSize"; static final String MAX_RETRY_TIME = "maxRetryTime"; - static final String IGNORE_WRITE_ERROR = "ignoreWriteError"; + + // for rdb + static final String COLUMN = "column"; + static final String COLUMN_TYPE = "columnType"; + static final String TABLE = "table"; } diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/SourceDBType.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/SourceDBType.java new file mode 100644 index 00000000..792806a6 --- /dev/null +++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/SourceDBType.java @@ -0,0 +1,5 @@ +package com.alibaba.datax.plugin.writer.tsdbwriter; + +public enum SourceDBType { + TSDB, RDB +} diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBConverter.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBConverter.java new file mode 100644 index 00000000..86e35c56 --- /dev/null +++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBConverter.java @@ -0,0 +1,96 @@ +package com.alibaba.datax.plugin.writer.tsdbwriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.fastjson.JSON; +import com.aliyun.hitsdb.client.value.request.MultiFieldPoint; +import com.aliyun.hitsdb.client.value.request.Point; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +class TSDBConverter { + + private static final Logger LOG = LoggerFactory.getLogger(TSDBConverter.class); + + private List columnName; + private List columnType; + + TSDBConverter(List columnName, List columnType) { + this.columnName = columnName; + this.columnType = columnType; + LOG.info("columnName: {}, columnType: {}", JSON.toJSONString(columnName), JSON.toJSONString(columnType)); + } + + List transRecord2Point(List records) { + List dps = new ArrayList(); + for (Record record : records) { + List metricBuilders = new ArrayList(); + Map tags = new HashMap(); + Long time = 0L; + + for (int i = 0; i < columnType.size(); i++) { + String type = columnType.get(i); + String name = columnName.get(i); + Column column = record.getColumn(i); + if (TSDBModel.TSDB_TAG.equals(type)) { + tags.put(name, column.asString()); + } else if (TSDBModel.TSDB_FIELD_DOUBLE.equals(type)) { + metricBuilders.add(new Point.MetricBuilder(name).value(column.asDouble())); + } else if (TSDBModel.TSDB_FIELD_STRING.equals(type)) { + metricBuilders.add(new Point.MetricBuilder(name).value(column.asString())); + } else if (TSDBModel.TSDB_FIELD_BOOL.equals(type)) { + metricBuilders.add(new Point.MetricBuilder(name).value(column.asBoolean())); + } else if (TSDBModel.TSDB_TIMESTAMP.equals(type)) { + time = column.asLong(); + } else if (TSDBModel.TSDB_METRIC_NUM.equals(type)) { + // compatible with previous usage of TSDB_METRIC_NUM + metricBuilders.add(new Point.MetricBuilder(name).value(column.asDouble())); + } else if (TSDBModel.TSDB_METRIC_STRING.equals(type)) { + // compatible with previous usage of TSDB_METRIC_STRING + metricBuilders.add(new Point.MetricBuilder(name).value(column.asString())); + } + } + for (Point.MetricBuilder metricBuilder : metricBuilders) { + dps.add(metricBuilder.tag(tags).timestamp(time).build(false)); + } + } + return dps; + } + + List transRecord2MultiFieldPoint(List records, String tableName) { + List dps = new ArrayList(); + for (Record record : records) { + MultiFieldPoint.MetricBuilder builder = MultiFieldPoint.metric(tableName); + for (int i = 0; i < columnType.size(); i++) { + String type = columnType.get(i); + String name = columnName.get(i); + Column column = record.getColumn(i); + if (TSDBModel.TSDB_TAG.equals(type)) { + builder.tag(name, column.asString()); + } else if (TSDBModel.TSDB_FIELD_DOUBLE.equals(type)) { + builder.field(name, column.asDouble()); + } else if (TSDBModel.TSDB_FIELD_STRING.equals(type)) { + builder.field(name, column.asString()); + } else if (TSDBModel.TSDB_FIELD_BOOL.equals(type)) { + builder.field(name, column.asBoolean()); + } else if (TSDBModel.TSDB_TIMESTAMP.equals(type)) { + builder.timestamp(column.asLong()); + } else if (TSDBModel.TSDB_METRIC_NUM.equals(type)) { + // compatible with previous usage of TSDB_METRIC_NUM + builder.field(name, column.asDouble()); + } else if (TSDBModel.TSDB_METRIC_STRING.equals(type)) { + // compatible with previous usage of TSDB_METRIC_STRING + builder.field(name, column.asString()); + } + } + MultiFieldPoint point = builder.build(false); + dps.add(point); + } + return dps; + } +} diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBModel.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBModel.java new file mode 100644 index 00000000..ead0e2cc --- /dev/null +++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBModel.java @@ -0,0 +1,11 @@ +package com.alibaba.datax.plugin.writer.tsdbwriter; + +class TSDBModel { + static final String TSDB_METRIC_NUM = "metric_num"; + static final String TSDB_METRIC_STRING = "metric_string"; + static final String TSDB_TAG = "tag"; + static final String TSDB_TIMESTAMP = "timestamp"; + static final String TSDB_FIELD_DOUBLE = "field_double"; + static final String TSDB_FIELD_STRING = "field_string"; + static final String TSDB_FIELD_BOOL = "field_bool"; +} \ No newline at end of file diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriter.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriter.java index e410b2ba..85a32a07 100755 --- a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriter.java +++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriter.java @@ -7,12 +7,20 @@ import com.alibaba.datax.common.spi.Writer; import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.RetryUtil; import com.alibaba.datax.plugin.writer.conn.TSDBConnection; +import com.aliyun.hitsdb.client.TSDB; +import com.aliyun.hitsdb.client.TSDBClientFactory; +import com.aliyun.hitsdb.client.TSDBConfig; +import com.aliyun.hitsdb.client.value.request.MultiFieldPoint; +import com.aliyun.hitsdb.client.value.request.Point; +import com.aliyun.hitsdb.client.value.response.batch.IgnoreErrorsResult; +import com.aliyun.hitsdb.client.value.response.batch.MultiFieldIgnoreErrorsResult; +import com.aliyun.hitsdb.client.value.response.batch.SummaryResult; import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.ArrayList; -import java.util.List; +import java.io.IOException; +import java.util.*; import java.util.concurrent.Callable; /** @@ -26,6 +34,9 @@ import java.util.concurrent.Callable; @SuppressWarnings("unused") public class TSDBWriter extends Writer { + private static SourceDBType DB_TYPE; + private static TSDB tsdb = null; + public static class Job extends Writer.Job { private static final Logger LOG = LoggerFactory.getLogger(Job.class); @@ -34,33 +45,99 @@ public class TSDBWriter extends Writer { @Override public void init() { - this.originalConfig = super.getPluginJobConf(); + originalConfig = super.getPluginJobConf(); - String address = this.originalConfig.getString(Key.ENDPOINT); - if (StringUtils.isBlank(address)) { + // check source db type + String sourceDbType = originalConfig.getString(Key.SOURCE_DB_TYPE); + if (StringUtils.isBlank(sourceDbType)) { + sourceDbType = SourceDBType.TSDB.name(); + originalConfig.set(Key.SOURCE_DB_TYPE, sourceDbType); + LOG.info("The parameter [" + Key.SOURCE_DB_TYPE + "] will be default value: " + SourceDBType.TSDB); + } + try { + DB_TYPE = SourceDBType.valueOf(sourceDbType); + } catch (Exception e) { throw DataXException.asDataXException(TSDBWriterErrorCode.REQUIRED_VALUE, - "The parameter [" + Key.ENDPOINT + "] is not set."); + "The parameter [" + Key.SOURCE_DB_TYPE + + "] is invalid, which should be one of [" + Arrays.toString(SourceDBType.values()) + "]."); } - Integer batchSize = this.originalConfig.getInt(Key.BATCH_SIZE); - if (batchSize == null || batchSize < 1) { - originalConfig.set(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE); - LOG.info("The parameter [" + Key.BATCH_SIZE + - "] will be default value: " + Constant.DEFAULT_BATCH_SIZE); - } + // for tsdb + if (DB_TYPE == SourceDBType.TSDB) { + String address = originalConfig.getString(Key.ENDPOINT); + if (StringUtils.isBlank(address)) { + throw DataXException.asDataXException(TSDBWriterErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.ENDPOINT + "] is not set."); + } - Integer retrySize = this.originalConfig.getInt(Key.MAX_RETRY_TIME); - if (retrySize == null || retrySize < 0) { - originalConfig.set(Key.MAX_RETRY_TIME, Constant.DEFAULT_TRY_SIZE); - LOG.info("The parameter [" + Key.MAX_RETRY_TIME + - "] will be default value: " + Constant.DEFAULT_TRY_SIZE); - } + String username = originalConfig.getString(Key.USERNAME, null); + if (StringUtils.isBlank(username)) { + LOG.warn("The parameter [" + Key.USERNAME + "] is blank."); + } + String password = originalConfig.getString(Key.PASSWORD, null); + if (StringUtils.isBlank(password)) { + LOG.warn("The parameter [" + Key.PASSWORD + "] is blank."); + } + + Integer batchSize = originalConfig.getInt(Key.BATCH_SIZE); + if (batchSize == null || batchSize < 1) { + originalConfig.set(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE); + LOG.info("The parameter [" + Key.BATCH_SIZE + + "] will be default value: " + Constant.DEFAULT_BATCH_SIZE); + } + + Integer retrySize = originalConfig.getInt(Key.MAX_RETRY_TIME); + if (retrySize == null || retrySize < 0) { + originalConfig.set(Key.MAX_RETRY_TIME, Constant.DEFAULT_TRY_SIZE); + LOG.info("The parameter [" + Key.MAX_RETRY_TIME + + "] will be default value: " + Constant.DEFAULT_TRY_SIZE); + } + + Boolean ignoreWriteError = originalConfig.getBool(Key.IGNORE_WRITE_ERROR); + if (ignoreWriteError == null) { + originalConfig.set(Key.IGNORE_WRITE_ERROR, Constant.DEFAULT_IGNORE_WRITE_ERROR); + LOG.info("The parameter [" + Key.IGNORE_WRITE_ERROR + + "] will be default value: " + Constant.DEFAULT_IGNORE_WRITE_ERROR); + } + } else if (DB_TYPE == SourceDBType.RDB) { + // for rdb + originalConfig.getNecessaryValue(Key.ENDPOINT, TSDBWriterErrorCode.REQUIRED_VALUE); + originalConfig.getNecessaryValue(Key.COLUMN_TYPE, TSDBWriterErrorCode.REQUIRED_VALUE); + originalConfig.getNecessaryValue(Key.COLUMN, TSDBWriterErrorCode.REQUIRED_VALUE); + String endpoint = originalConfig.getString(Key.ENDPOINT); + String[] split = endpoint.split(":"); + if (split.length != 3) { + throw DataXException.asDataXException(TSDBWriterErrorCode.REQUIRED_VALUE, + "The parameter [" + Key.ENDPOINT + "] is invalid, which should be [http://IP:Port]."); + } + String ip = split[1].substring(2); + int port = Integer.parseInt(split[2]); + + String username = originalConfig.getString(Key.USERNAME, null); + if (StringUtils.isBlank(username)) { + LOG.warn("The parameter [" + Key.USERNAME + "] is blank."); + } + + String password = originalConfig.getString(Key.PASSWORD, null); + if (StringUtils.isBlank(password)) { + LOG.warn("The parameter [" + Key.PASSWORD + "] is blank."); + } + + if (!StringUtils.isBlank(password) && !StringUtils.isBlank(username)) { + tsdb = TSDBClientFactory.connect(TSDBConfig.address(ip, port).basicAuth(username, password).config()); + } else { + tsdb = TSDBClientFactory.connect(TSDBConfig.address(ip, port).config()); + } + + String database = originalConfig.getString(Key.DATABASE, null); + if (StringUtils.isBlank(database)) { + LOG.info("The parameter [" + Key.DATABASE + "] is blank."); + } else { + tsdb.useDatabase(database); + } + + LOG.info("Tsdb config:" + originalConfig.toJSON()); - Boolean ignoreWriteError = this.originalConfig.getBool(Key.IGNORE_WRITE_ERROR); - if (ignoreWriteError == null) { - originalConfig.set(Key.IGNORE_WRITE_ERROR, Constant.DEFAULT_IGNORE_WRITE_ERROR); - LOG.info("The parameter [" + Key.IGNORE_WRITE_ERROR + - "] will be default value: " + Constant.DEFAULT_IGNORE_WRITE_ERROR); } } @@ -72,7 +149,7 @@ public class TSDBWriter extends Writer { public List split(int mandatoryNumber) { ArrayList configurations = new ArrayList(mandatoryNumber); for (int i = 0; i < mandatoryNumber; i++) { - configurations.add(this.originalConfig.clone()); + configurations.add(originalConfig.clone()); } return configurations; } @@ -83,6 +160,14 @@ public class TSDBWriter extends Writer { @Override public void destroy() { + if (DB_TYPE == SourceDBType.RDB) { + if (tsdb != null) { + try { + tsdb.close(); + } catch (IOException ignored) { + } + } + } } } @@ -91,18 +176,87 @@ public class TSDBWriter extends Writer { private static final Logger LOG = LoggerFactory.getLogger(Task.class); private TSDBConnection conn; + private boolean multiField; private int batchSize; private int retrySize; private boolean ignoreWriteError; + private String tableName; + private TSDBConverter tsdbConverter; @Override public void init() { Configuration writerSliceConfig = getPluginJobConf(); - String address = writerSliceConfig.getString(Key.ENDPOINT); - this.conn = new TSDBConnection(address); - this.batchSize = writerSliceConfig.getInt(Key.BATCH_SIZE); - this.retrySize = writerSliceConfig.getInt(Key.MAX_RETRY_TIME); + + // single field | multi fields + this.multiField = writerSliceConfig.getBool(Key.MULTI_FIELD, false); this.ignoreWriteError = writerSliceConfig.getBool(Key.IGNORE_WRITE_ERROR); + + // for tsdb + if (DB_TYPE == SourceDBType.TSDB) { + String address = writerSliceConfig.getString(Key.ENDPOINT); + String database = writerSliceConfig.getString(Key.DATABASE); + String username = writerSliceConfig.getString(Key.USERNAME); + String password = writerSliceConfig.getString(Key.PASSWORD); + this.conn = new TSDBConnection(address, database, username, password); + this.batchSize = writerSliceConfig.getInt(Key.BATCH_SIZE); + this.retrySize = writerSliceConfig.getInt(Key.MAX_RETRY_TIME); + + } else if (DB_TYPE == SourceDBType.RDB) { + // for rdb + int timeSize = 0; + int fieldSize = 0; + int tagSize = 0; + batchSize = writerSliceConfig.getInt(Key.BATCH_SIZE, 100); + List columnName = writerSliceConfig.getList(Key.COLUMN, String.class); + List columnType = writerSliceConfig.getList(Key.COLUMN_TYPE, String.class); + Set typeSet = new HashSet(columnType); + if (columnName.size() != columnType.size()) { + throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.COLUMN_TYPE + "] should has same length with [" + Key.COLUMN + "]."); + } + + for (String type : columnType) { + if (TSDBModel.TSDB_TAG.equals(type)) { + tagSize ++; + } else if (TSDBModel.TSDB_FIELD_DOUBLE.equals(type) || TSDBModel.TSDB_FIELD_STRING.equals(type) + || TSDBModel.TSDB_FIELD_BOOL.equals(type)) { + fieldSize++; + } else if (TSDBModel.TSDB_TIMESTAMP.equals(type)) { + timeSize++; + } + } + + if (fieldSize == 0) { + // compatible with previous usage of TSDB_METRIC_NUM and TSDB_METRIC_STRING + if (!typeSet.contains(TSDBModel.TSDB_METRIC_NUM) && !typeSet.contains(TSDBModel.TSDB_METRIC_STRING)) { + throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.COLUMN_TYPE + "] is invalid, must set at least one of " + + TSDBModel.TSDB_FIELD_DOUBLE + ", " + TSDBModel.TSDB_FIELD_STRING + " or " + TSDBModel.TSDB_FIELD_BOOL + "."); + } + } + + if (tagSize == 0) { + throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.COLUMN_TYPE + "] is invalid, must set " + TSDBModel.TSDB_TAG + ". "); + } + + if (timeSize != 1) { + throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.COLUMN_TYPE + "] is invalid, must set one and only one " + + TSDBModel.TSDB_TIMESTAMP + "."); + } + + if (multiField) { + // check source db type + tableName = writerSliceConfig.getString(Key.TABLE); + if (StringUtils.isBlank(tableName)) { + throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE, + "The parameter [" + Key.TABLE + "] h must set when use multi field input."); + } + } + tsdbConverter = new TSDBConverter(columnName, columnType); + + } } @Override @@ -111,30 +265,52 @@ public class TSDBWriter extends Writer { @Override public void startWrite(RecordReceiver recordReceiver) { - try { - Record lastRecord = null; - Record record; - int count = 0; - StringBuilder dps = new StringBuilder(); - while ((record = recordReceiver.getFromReader()) != null) { - final int recordLength = record.getColumnNumber(); - for (int i = 0; i < recordLength; i++) { - dps.append(record.getColumn(i).asString()); - dps.append(","); - count++; - if (count == batchSize) { - count = 0; - batchPut(record, "[" + dps.substring(0, dps.length() - 1) + "]"); - dps = new StringBuilder(); + // for tsdb + if (DB_TYPE == SourceDBType.TSDB) { + try { + Record lastRecord = null; + Record record; + int count = 0; + StringBuilder dps = new StringBuilder(); + while ((record = recordReceiver.getFromReader()) != null) { + final int recordLength = record.getColumnNumber(); + for (int i = 0; i < recordLength; i++) { + dps.append(record.getColumn(i).asString()); + dps.append(","); + count++; + if (count == batchSize) { + count = 0; + batchPut(record, "[" + dps.substring(0, dps.length() - 1) + "]"); + dps = new StringBuilder(); + } } + lastRecord = record; } - lastRecord = record; + if (StringUtils.isNotBlank(dps.toString())) { + batchPut(lastRecord, "[" + dps.substring(0, dps.length() - 1) + "]"); + } + } catch (Exception e) { + throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION, e); } - if (StringUtils.isNotBlank(dps.toString())) { - batchPut(lastRecord, "[" + dps.substring(0, dps.length() - 1) + "]"); + } else if (DB_TYPE == SourceDBType.RDB) { + // for rdb + List writerBuffer = new ArrayList(this.batchSize); + Record record; + long total = 0; + while ((record = recordReceiver.getFromReader()) != null) { + writerBuffer.add(record); + if (writerBuffer.size() >= this.batchSize) { + total += doBatchInsert(writerBuffer); + writerBuffer.clear(); + } } - } catch (Exception e) { - throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION, e); + if (!writerBuffer.isEmpty()) { + total += doBatchInsert(writerBuffer); + writerBuffer.clear(); + } + getTaskPluginCollector().collectMessage("write size", total + ""); + LOG.info("Task finished, write size: {}", total); + } } @@ -143,12 +319,13 @@ public class TSDBWriter extends Writer { RetryUtil.executeWithRetry(new Callable() { @Override public Integer call() { - if (!conn.put(dps)) { - getTaskPluginCollector().collectDirtyRecord(record, "Put data points failed!"); - throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION, - "Put data points failed!"); + final boolean success = multiField ? conn.mput(dps) : conn.put(dps); + if (success) { + return 0; } - return 0; + getTaskPluginCollector().collectDirtyRecord(record, "Put data points failed!"); + throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION, + "Put data points failed!"); } }, retrySize, 60000L, true); } catch (Exception e) { @@ -160,6 +337,47 @@ public class TSDBWriter extends Writer { } } + private long doBatchInsert(final List writerBuffer) { + int size; + if (ignoreWriteError) { + if (multiField) { + List points = tsdbConverter.transRecord2MultiFieldPoint(writerBuffer, tableName); + size = points.size(); + MultiFieldIgnoreErrorsResult ignoreErrorsResult = tsdb.multiFieldPutSync(points, MultiFieldIgnoreErrorsResult.class); + if (ignoreErrorsResult == null) { + LOG.error("Unexpected inner error for insert"); + } else if (ignoreErrorsResult.getFailed() > 0) { + LOG.error("write TSDB failed num:" + ignoreErrorsResult.getFailed()); + } + } else { + List points = tsdbConverter.transRecord2Point(writerBuffer); + size = points.size(); + IgnoreErrorsResult ignoreErrorsResult = tsdb.putSync(points, IgnoreErrorsResult.class); + if (ignoreErrorsResult == null) { + LOG.error("Unexpected inner error for insert"); + } else if (ignoreErrorsResult.getFailed() > 0) { + LOG.error("write TSDB failed num:" + ignoreErrorsResult.getFailed()); + } + } + } else { + SummaryResult summaryResult; + if (multiField) { + List points = tsdbConverter.transRecord2MultiFieldPoint(writerBuffer, tableName); + size = points.size(); + summaryResult = tsdb.multiFieldPutSync(points, SummaryResult.class); + } else { + List points = tsdbConverter.transRecord2Point(writerBuffer); + size = points.size(); + summaryResult = tsdb.putSync(points, SummaryResult.class); + } + if (summaryResult.getFailed() > 0) { + LOG.error("write TSDB failed num:" + summaryResult.getFailed()); + throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION, "Write TSDB failed", new Exception()); + } + } + return size; + } + @Override public void post() { } diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriterErrorCode.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriterErrorCode.java index f907fb67..ab4c3894 100755 --- a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriterErrorCode.java +++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriterErrorCode.java @@ -13,6 +13,7 @@ import com.alibaba.datax.common.spi.ErrorCode; public enum TSDBWriterErrorCode implements ErrorCode { REQUIRED_VALUE("TSDBWriter-00", "Missing the necessary value"), + ILLEGAL_VALUE("TSDBWriter-01", "Illegal value"), RUNTIME_EXCEPTION("TSDBWriter-01", "Runtime exception"), RETRY_WRITER_EXCEPTION("TSDBWriter-02", "After repeated attempts, the write still fails"); diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/HttpUtils.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/HttpUtils.java index b81512f7..29b14dab 100644 --- a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/HttpUtils.java +++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/HttpUtils.java @@ -1,11 +1,14 @@ package com.alibaba.datax.plugin.writer.util; import com.alibaba.fastjson.JSON; +import org.apache.commons.lang3.StringUtils; import org.apache.http.client.fluent.Content; import org.apache.http.client.fluent.Request; import org.apache.http.entity.ContentType; import java.nio.charset.Charset; +import java.nio.charset.StandardCharsets; +import java.util.Base64; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -19,43 +22,44 @@ import java.util.concurrent.TimeUnit; */ public final class HttpUtils { - public final static Charset UTF_8 = Charset.forName("UTF-8"); public final static int CONNECT_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60); public final static int SOCKET_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60); + private static final String CREDENTIALS_FORMAT = "%s:%s"; + private static final String BASIC_AUTHENTICATION_FORMAT = "Basic %s"; + private HttpUtils() { } - public static String get(String url) throws Exception { - Content content = Request.Get(url) + public static String get(String url, String username, String password) throws Exception { + final Request request = Request.Get(url) .connectTimeout(CONNECT_TIMEOUT_DEFAULT_IN_MILL) - .socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL) + .socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL); + addAuth(request, username, password); + Content content = request .execute() .returnContent(); if (content == null) { return null; } - return content.asString(UTF_8); + return content.asString(StandardCharsets.UTF_8); } - public static String post(String url, Map params) throws Exception { - return post(url, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL); + public static String post(String url, String username, String password, Map params) throws Exception { + return post(url, username, password, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL); } - public static String post(String url, String params) throws Exception { - return post(url, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL); + public static String post(String url, String username, String password, String params) throws Exception { + return post(url, username, password, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL); } - public static String post(String url, Map params, + public static String post(String url, String username, String password, String params, int connectTimeoutInMill, int socketTimeoutInMill) throws Exception { - return post(url, JSON.toJSONString(params), connectTimeoutInMill, socketTimeoutInMill); - } - - public static String post(String url, String params, - int connectTimeoutInMill, int socketTimeoutInMill) throws Exception { - Content content = Request.Post(url) + Request request = Request.Post(url) .connectTimeout(connectTimeoutInMill) - .socketTimeout(socketTimeoutInMill) + .socketTimeout(socketTimeoutInMill); + addAuth(request, username, password); + Content content = request .addHeader("Content-Type", "application/json") .bodyString(params, ContentType.APPLICATION_JSON) .execute() @@ -63,6 +67,22 @@ public final class HttpUtils { if (content == null) { return null; } - return content.asString(UTF_8); + return content.asString(StandardCharsets.UTF_8); + } + + private static void addAuth(Request request, String username, String password) { + String authorization = generateHttpAuthorization(username, password); + if (authorization != null) { + request.setHeader("Authorization", authorization); + } + } + + private static String generateHttpAuthorization(String username, String password) { + if (StringUtils.isBlank(username) || StringUtils.isBlank(password)) { + return null; + } + String credentials = String.format(CREDENTIALS_FORMAT, username, password); + credentials = Base64.getEncoder().encodeToString(credentials.getBytes()); + return String.format(BASIC_AUTHENTICATION_FORMAT, credentials); } } diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/TSDBUtils.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/TSDBUtils.java index ed01d877..d57c5935 100644 --- a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/TSDBUtils.java +++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/TSDBUtils.java @@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.util; import com.alibaba.datax.plugin.writer.conn.DataPoint4TSDB; import com.alibaba.fastjson.JSON; +import org.apache.commons.lang3.StringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -22,45 +23,56 @@ public final class TSDBUtils { private TSDBUtils() { } - public static String version(String address) { + public static String version(String address, String username, String password) { String url = String.format("%s/api/version", address); String rsp; try { - rsp = HttpUtils.get(url); + rsp = HttpUtils.get(url, username, password); } catch (Exception e) { throw new RuntimeException(e); } return rsp; } - public static String config(String address) { + public static String config(String address, String username, String password) { String url = String.format("%s/api/config", address); String rsp; try { - rsp = HttpUtils.get(url); + rsp = HttpUtils.get(url, username, password); } catch (Exception e) { throw new RuntimeException(e); } return rsp; } - public static boolean put(String address, List dps) { - return put(address, JSON.toJSON(dps)); + public static boolean put(String address, String database, String username, String password, List dps) { + return put(address, database, username, password, JSON.toJSON(dps)); } - public static boolean put(String address, DataPoint4TSDB dp) { - return put(address, JSON.toJSON(dp)); + public static boolean put(String address, String database, String username, String password, DataPoint4TSDB dp) { + return put(address, database, username, password, JSON.toJSON(dp)); } - private static boolean put(String address, Object o) { - return put(address, o.toString()); + private static boolean put(String address, String database, String username, String password, Object o) { + return put(address, database, username, password, o.toString()); } - public static boolean put(String address, String s) { - String url = String.format("%s/api/put", address); + public static boolean put(String address, String database, String username, String password, String s) { + return put(address, database, username, password, s, false); + } + + public static boolean mput(String address, String database, String username, String password, String s) { + return put(address, database, username, password, s, true); + } + + public static boolean put(String address, String database, String username, String password, String s, boolean multiField) { + String url = address + (multiField ? "/api/mput" : "/api/put"); + if (!StringUtils.isBlank(database)) { + url = url.concat("?db=" + database); + } String rsp; try { - rsp = HttpUtils.post(url, s); + rsp = HttpUtils.post(url, username, password, s); // If successful, the returned content should be null. assert rsp == null; } catch (Exception e) {