Merge pull request #1245 from JohnnyJie/tsdb

feat: tsdb2tsdb [mput/username/password/database]
This commit is contained in:
Trafalgar 2021-12-15 11:04:35 +08:00 committed by GitHub
commit b984f4bfc7
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
20 changed files with 805 additions and 196 deletions

View File

@ -357,6 +357,13 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>tsdbreader/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>adbpgwriter/target/datax/</directory>
<includes>

View File

@ -16,6 +16,8 @@ public final class Constant {
static final String DEFAULT_DATA_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static final String METRIC_SPECIFY_KEY = "__metric__";
public static final String METRIC_SPECIFY_KEY_PREFIX = METRIC_SPECIFY_KEY + ".";
public static final int METRIC_SPECIFY_KEY_PREFIX_LENGTH = METRIC_SPECIFY_KEY_PREFIX.length();
public static final String TS_SPECIFY_KEY = "__ts__";
public static final String VALUE_SPECIFY_KEY = "__value__";

View File

@ -17,14 +17,19 @@ public class Key {
// RDB for MySQL / ADB etc.
static final String SINK_DB_TYPE = "sinkDbType";
static final String ENDPOINT = "endpoint";
static final String USERNAME = "username";
static final String PASSWORD = "password";
static final String COLUMN = "column";
static final String METRIC = "metric";
static final String FIELD = "field";
static final String TAG = "tag";
static final String COMBINE = "combine";
static final String INTERVAL_DATE_TIME = "splitIntervalMs";
static final String BEGIN_DATE_TIME = "beginDateTime";
static final String END_DATE_TIME = "endDateTime";
static final String HINT = "hint";
static final Boolean COMBINE_DEFAULT_VALUE = false;
static final Integer INTERVAL_DATE_TIME_DEFAULT_VALUE = 60;
static final String TYPE_DEFAULT_VALUE = "TSDB";
static final Set<String> TYPE_SET = new HashSet<>();

View File

@ -60,6 +60,15 @@ public class TSDBReader extends Reader {
"The parameter [" + Key.ENDPOINT + "] is not set.");
}
String username = originalConfig.getString(Key.USERNAME, null);
if (StringUtils.isBlank(username)) {
LOG.warn("The parameter [" + Key.USERNAME + "] is blank.");
}
String password = originalConfig.getString(Key.PASSWORD, null);
if (StringUtils.isBlank(password)) {
LOG.warn("The parameter [" + Key.PASSWORD + "] is blank.");
}
// tagK / field could be empty
if ("TSDB".equals(type)) {
List<String> columns = originalConfig.getList(Key.COLUMN, String.class);
@ -76,7 +85,14 @@ public class TSDBReader extends Reader {
"The parameter [" + Key.COLUMN + "] is not set.");
}
for (String specifyKey : Constant.MUST_CONTAINED_SPECIFY_KEYS) {
if (!columns.contains(specifyKey)) {
boolean containSpecifyKey = false;
for (String column : columns) {
if (column.startsWith(specifyKey)) {
containSpecifyKey = true;
break;
}
}
if (!containSpecifyKey) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.COLUMN + "] should contain "
@ -99,6 +115,8 @@ public class TSDBReader extends Reader {
"The parameter [" + Key.INTERVAL_DATE_TIME + "] should be great than zero.");
}
Boolean isCombine = originalConfig.getBool(Key.COMBINE, Key.COMBINE_DEFAULT_VALUE);
SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT);
String startTime = originalConfig.getString(Key.BEGIN_DATE_TIME);
Long startDate;
@ -168,14 +186,14 @@ public class TSDBReader extends Reader {
startTime = format.parse(originalConfig.getString(Key.BEGIN_DATE_TIME)).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.BEGIN_DATE_TIME + "]失败.", e);
TSDBReaderErrorCode.ILLEGAL_VALUE, "Analysis [" + Key.BEGIN_DATE_TIME + "] failed.", e);
}
long endTime;
try {
endTime = format.parse(originalConfig.getString(Key.END_DATE_TIME)).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.END_DATE_TIME + "]失败.", e);
TSDBReaderErrorCode.ILLEGAL_VALUE, "Analysis [" + Key.END_DATE_TIME + "] failed.", e);
}
if (TimeUtils.isSecond(startTime)) {
startTime *= 1000;
@ -186,13 +204,14 @@ public class TSDBReader extends Reader {
DateTime startDateTime = new DateTime(TimeUtils.getTimeInHour(startTime));
DateTime endDateTime = new DateTime(TimeUtils.getTimeInHour(endTime));
final Boolean isCombine = originalConfig.getBool(Key.COMBINE, Key.COMBINE_DEFAULT_VALUE);
if ("TSDB".equals(type)) {
// split by metric
for (String column : columns4TSDB) {
if (isCombine) {
// split by time in hour
while (startDateTime.isBefore(endDateTime)) {
Configuration clone = this.originalConfig.clone();
clone.set(Key.COLUMN, Collections.singletonList(column));
clone.set(Key.COLUMN, columns4TSDB);
clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
startDateTime = startDateTime.plusMillis(splitIntervalMs);
@ -202,15 +221,30 @@ public class TSDBReader extends Reader {
LOG.info("Configuration: {}", JSON.toJSONString(clone));
}
} else {
// split by time in hour
while (startDateTime.isBefore(endDateTime)) {
// split by metric
for (String column : columns4TSDB) {
Configuration clone = this.originalConfig.clone();
clone.set(Key.COLUMN, Collections.singletonList(column));
clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
startDateTime = startDateTime.plusMillis(splitIntervalMs);
// Make sure the time interval is [start, end).
clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
configurations.add(clone);
LOG.info("Configuration: {}", JSON.toJSONString(clone));
}
}
}
} else {
// split by metric
for (String metric : metrics) {
// split by time in hour
if (isCombine) {
while (startDateTime.isBefore(endDateTime)) {
Configuration clone = this.originalConfig.clone();
clone.set(Key.COLUMN, columns4RDB);
clone.set(Key.METRIC, Collections.singletonList(metric));
clone.set(Key.METRIC, metrics);
clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
startDateTime = startDateTime.plusMillis(splitIntervalMs);
@ -220,6 +254,24 @@ public class TSDBReader extends Reader {
LOG.info("Configuration: {}", JSON.toJSONString(clone));
}
} else {
// split by time in hour
while (startDateTime.isBefore(endDateTime)) {
// split by metric
for (String metric : metrics) {
Configuration clone = this.originalConfig.clone();
clone.set(Key.COLUMN, columns4RDB);
clone.set(Key.METRIC, Collections.singletonList(metric));
clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
startDateTime = startDateTime.plusMillis(splitIntervalMs);
// Make sure the time interval is [start, end).
clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
configurations.add(clone);
LOG.info("Configuration: {}", JSON.toJSONString(clone));
}
}
}
}
return configurations;
@ -247,6 +299,8 @@ public class TSDBReader extends Reader {
private TSDBConnection conn;
private Long startTime;
private Long endTime;
private Boolean isCombine;
private Map<String, Object> hint;
@Override
public void init() {
@ -265,11 +319,16 @@ public class TSDBReader extends Reader {
this.tags = readerSliceConfig.getMap(Key.TAG);
String address = readerSliceConfig.getString(Key.ENDPOINT);
String username = readerSliceConfig.getString(Key.USERNAME);
String password = readerSliceConfig.getString(Key.PASSWORD);
conn = new TSDBConnection(address);
conn = new TSDBConnection(address, username, password);
this.startTime = readerSliceConfig.getLong(Key.BEGIN_DATE_TIME);
this.endTime = readerSliceConfig.getLong(Key.END_DATE_TIME);
this.isCombine = readerSliceConfig.getBool(Key.COMBINE, Key.COMBINE_DEFAULT_VALUE);
this.hint = readerSliceConfig.getMap(Key.HINT);
}
@Override
@ -283,29 +342,35 @@ public class TSDBReader extends Reader {
if ("TSDB".equals(type)) {
for (String metric : columns4TSDB) {
final Map<String, String> tags = this.tags == null ?
null : (Map<String, String>) this.tags.get(metric);
null : (Map<String, String>) this.tags.get(metric);
if (fields == null || !fields.containsKey(metric)) {
conn.sendDPs(metric, tags, this.startTime, this.endTime, recordSender);
conn.sendDPs(metric, tags, this.startTime, this.endTime, recordSender, hint);
} else {
conn.sendDPs(metric, (List<String>) fields.get(metric),
tags, this.startTime, this.endTime, recordSender);
tags, this.startTime, this.endTime, recordSender, hint);
}
}
} else {
for (String metric : metrics) {
if (isCombine) {
final Map<String, String> tags = this.tags == null ?
null : (Map<String, String>) this.tags.get(metric);
if (fields == null || !fields.containsKey(metric)) {
conn.sendRecords(metric, tags, startTime, endTime, columns4RDB, recordSender);
} else {
conn.sendRecords(metric, (List<String>) fields.get(metric),
tags, startTime, endTime, columns4RDB, recordSender);
null : (Map<String, String>) this.tags.get(metrics.get(0));
conn.sendRecords(metrics, tags, startTime, endTime, columns4RDB, recordSender, hint);
} else {
for (String metric : metrics) {
final Map<String, String> tags = this.tags == null ?
null : (Map<String, String>) this.tags.get(metric);
if (fields == null || !fields.containsKey(metric)) {
conn.sendRecords(metric, tags, startTime, endTime, columns4RDB, isCombine, recordSender, hint);
} else {
conn.sendRecords(metric, (List<String>) fields.get(metric),
tags, startTime, endTime, columns4RDB, recordSender, hint);
}
}
}
}
} catch (Exception e) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e);
TSDBReaderErrorCode.ILLEGAL_VALUE, "Error in getting or sending data point", e);
}
}

View File

@ -22,6 +22,20 @@ public interface Connection4TSDB {
*/
String address();
/**
* Get the address of Database.
*
* @return host+ip
*/
String username();
/**
* Get the address of Database.
*
* @return host+ip
*/
String password();
/**
* Get the version of Database.
*
@ -46,22 +60,27 @@ public interface Connection4TSDB {
/**
* Send data points for TSDB with single field.
*/
void sendDPs(String metric, Map<String, String> tags, Long start, Long end, RecordSender recordSender) throws Exception;
void sendDPs(String metric, Map<String, String> tags, Long start, Long end, RecordSender recordSender, Map<String, Object> hint) throws Exception;
/**
* Send data points for TSDB with multi fields.
*/
void sendDPs(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, RecordSender recordSender) throws Exception;
void sendDPs(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, RecordSender recordSender, Map<String, Object> hint) throws Exception;
/**
* Send data points for RDB with single field.
*/
void sendRecords(String metric, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender) throws Exception;
void sendRecords(String metric, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, Boolean isCombine, RecordSender recordSender, Map<String, Object> hint) throws Exception;
/**
* Send data points for RDB with multi fields.
*/
void sendRecords(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender) throws Exception;
void sendRecords(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender, Map<String, Object> hint) throws Exception;
/**
* Send data points for RDB with single fields on combine mode.
*/
void sendRecords(List<String> metrics, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender, Map<String, Object> hint) throws Exception;
/**
* Put data point.

View File

@ -19,9 +19,13 @@ import java.util.Map;
public class TSDBConnection implements Connection4TSDB {
private String address;
private String username;
private String password;
public TSDBConnection(String address) {
public TSDBConnection(String address, String username, String password) {
this.address = address;
this.username = username;
this.password = password;
}
@Override
@ -29,14 +33,24 @@ public class TSDBConnection implements Connection4TSDB {
return address;
}
@Override
public String username() {
return username;
}
@Override
public String password() {
return password;
}
@Override
public String version() {
return TSDBUtils.version(address);
return TSDBUtils.version(address, username, password);
}
@Override
public String config() {
return TSDBUtils.config(address);
return TSDBUtils.config(address, username, password);
}
@Override
@ -45,23 +59,28 @@ public class TSDBConnection implements Connection4TSDB {
}
@Override
public void sendDPs(String metric, Map<String, String> tags, Long start, Long end, RecordSender recordSender) throws Exception {
TSDBDump.dump4TSDB(this, metric, tags, start, end, recordSender);
public void sendDPs(String metric, Map<String, String> tags, Long start, Long end, RecordSender recordSender, Map<String, Object> hint) throws Exception {
TSDBDump.dump4TSDB(this, metric, tags, start, end, recordSender, hint);
}
@Override
public void sendDPs(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, RecordSender recordSender) throws Exception {
TSDBDump.dump4TSDB(this, metric, fields, tags, start, end, recordSender);
public void sendDPs(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, RecordSender recordSender, Map<String, Object> hint) throws Exception {
TSDBDump.dump4TSDB(this, metric, fields, tags, start, end, recordSender, hint);
}
@Override
public void sendRecords(String metric, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender) throws Exception {
TSDBDump.dump4RDB(this, metric, tags, start, end, columns4RDB, recordSender);
public void sendRecords(String metric, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, Boolean isCombine, RecordSender recordSender, Map<String, Object> hint) throws Exception {
TSDBDump.dump4RDB(this, metric, tags, start, end, columns4RDB, recordSender, hint);
}
@Override
public void sendRecords(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender) throws Exception {
TSDBDump.dump4RDB(this, metric, fields, tags, start, end, columns4RDB, recordSender);
public void sendRecords(List<String> metrics, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender, Map<String, Object> hint) throws Exception {
TSDBDump.dump4RDB(this, metrics, tags, start, end, columns4RDB, recordSender, hint);
}
@Override
public void sendRecords(String metric, List<String> fields, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender recordSender, Map<String, Object> hint) throws Exception {
TSDBDump.dump4RDB(this, metric, fields, tags, start, end, columns4RDB, recordSender, hint);
}
@Override

View File

@ -9,10 +9,9 @@ import com.alibaba.fastjson.parser.Feature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.*;
import static com.alibaba.datax.plugin.reader.tsdbreader.Constant.METRIC_SPECIFY_KEY_PREFIX_LENGTH;
/**
* Copyright @ 2019 alibaba.com
@ -37,10 +36,10 @@ final class TSDBDump {
}
static void dump4TSDB(TSDBConnection conn, String metric, Map<String, String> tags,
Long start, Long end, RecordSender sender) throws Exception {
Long start, Long end, RecordSender sender, Map<String, Object> hint) throws Exception {
LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
String res = queryRange4SingleField(conn, metric, tags, start, end);
String res = queryRange4SingleField(conn, metric, tags, start, end, hint);
List<String> dps = getDps4TSDB(metric, res);
if (dps == null || dps.isEmpty()) {
return;
@ -49,10 +48,10 @@ final class TSDBDump {
}
static void dump4TSDB(TSDBConnection conn, String metric, List<String> fields, Map<String, String> tags,
Long start, Long end, RecordSender sender) throws Exception {
Long start, Long end, RecordSender sender, Map<String, Object> hint) throws Exception {
LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
String res = queryRange4MultiFields(conn, metric, fields, tags, start, end);
String res = queryRange4MultiFields(conn, metric, fields, tags, start, end, hint);
List<String> dps = getDps4TSDB(metric, fields, res);
if (dps == null || dps.isEmpty()) {
return;
@ -61,10 +60,10 @@ final class TSDBDump {
}
static void dump4RDB(TSDBConnection conn, String metric, Map<String, String> tags,
Long start, Long end, List<String> columns4RDB, RecordSender sender) throws Exception {
Long start, Long end, List<String> columns4RDB, RecordSender sender, Map<String, Object> hint) throws Exception {
LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
String res = queryRange4SingleField(conn, metric, tags, start, end);
String res = queryRange4SingleField(conn, metric, tags, start, end, hint);
List<DataPoint4TSDB> dps = getDps4RDB(metric, res);
if (dps == null || dps.isEmpty()) {
return;
@ -92,12 +91,71 @@ final class TSDBDump {
}
}
public static void dump4RDB(TSDBConnection conn, List<String> metrics, Map<String, String> tags, Long start, Long end, List<String> columns4RDB, RecordSender sender, Map<String, Object> hint) throws Exception {
LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metrics, start, end);
List<DataPoint4TSDB> dps = new LinkedList<>();
for (String metric : metrics) {
String res = queryRange4SingleField(conn, metric, tags, start, end, hint);
final List<DataPoint4TSDB> dpList = getDps4RDB(metric, res);
if (dpList == null || dpList.isEmpty()) {
continue;
}
dps.addAll(dpList);
}
if (dps.isEmpty()) {
return;
}
Map<Long, Map<String, DataPoint4TSDB>> dpsCombinedByTs = new LinkedHashMap<>();
for (DataPoint4TSDB dp : dps) {
final long ts = dp.getTimestamp();
final Map<String, DataPoint4TSDB> dpsWithSameTs = dpsCombinedByTs.computeIfAbsent(ts, k -> new LinkedHashMap<>());
dpsWithSameTs.put(dp.getMetric(), dp);
}
for (Map.Entry<Long, Map<String, DataPoint4TSDB>> entry : dpsCombinedByTs.entrySet()) {
final Long ts = entry.getKey();
final Map<String, DataPoint4TSDB> metricAndDps = entry.getValue();
final Record record = sender.createRecord();
DataPoint4TSDB tmpDp = null;
for (final String column : columns4RDB) {
if (column.startsWith(Constant.METRIC_SPECIFY_KEY)) {
final String m = column.substring(METRIC_SPECIFY_KEY_PREFIX_LENGTH);
tmpDp = metricAndDps.get(m);
if (tmpDp == null) {
continue;
}
record.addColumn(getColumn(tmpDp.getValue()));
} else if (Constant.TS_SPECIFY_KEY.equals(column)) {
record.addColumn(new LongColumn(ts));
} else if (Constant.VALUE_SPECIFY_KEY.equals(column)) {
// combine 模式下不应该定义 __value__ 字段因为 __metric__.xxx 字段会输出对应的 value
throw new RuntimeException("The " + Constant.VALUE_SPECIFY_KEY +
" column should not be specified in combine mode!");
} else {
// combine 模式下应该确保 __metric__.xxx 字段的定义放在 column 数组的最前面以保证获取到 metric
if (tmpDp == null) {
throw new RuntimeException("These " + Constant.METRIC_SPECIFY_KEY_PREFIX +
" column should be placed first in the column array in combine mode!");
}
final Object tagv = tmpDp.getTags().get(column);
if (tagv == null) {
continue;
}
record.addColumn(getColumn(tagv));
}
}
sender.sendToWriter(record);
}
}
static void dump4RDB(TSDBConnection conn, String metric, List<String> fields,
Map<String, String> tags, Long start, Long end,
List<String> columns4RDB, RecordSender sender) throws Exception {
List<String> columns4RDB, RecordSender sender, Map<String, Object> hint) throws Exception {
LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
String res = queryRange4MultiFields(conn, metric, fields, tags, start, end);
String res = queryRange4MultiFields(conn, metric, fields, tags, start, end, hint);
List<DataPoint4TSDB> dps = getDps4RDB(metric, fields, res);
if (dps == null || dps.isEmpty()) {
return;
@ -131,14 +189,16 @@ final class TSDBDump {
valueColumn = new LongColumn((Long) value);
} else if (value instanceof String) {
valueColumn = new StringColumn((String) value);
} else if (value instanceof Integer) {
valueColumn = new LongColumn(((Integer)value).longValue());
} else {
throw new Exception(String.format("value 不支持类型: [%s]", value.getClass().getSimpleName()));
throw new Exception(String.format("value not supported type: [%s]", value.getClass().getSimpleName()));
}
return valueColumn;
}
private static String queryRange4SingleField(TSDBConnection conn, String metric, Map<String, String> tags,
Long start, Long end) throws Exception {
Long start, Long end, Map<String, Object> hint) throws Exception {
String tagKV = getFilterByTags(tags);
String body = "{\n" +
" \"start\": " + start + ",\n" +
@ -148,14 +208,15 @@ final class TSDBDump {
" \"aggregator\": \"none\",\n" +
" \"metric\": \"" + metric + "\"\n" +
(tagKV == null ? "" : tagKV) +
(hint == null ? "" : (", \"hint\": " + JSON.toJSONString(hint))) +
" }\n" +
" ]\n" +
"}";
return HttpUtils.post(conn.address() + QUERY, body);
return HttpUtils.post(conn.address() + QUERY, conn.username(), conn.password(), body);
}
private static String queryRange4MultiFields(TSDBConnection conn, String metric, List<String> fields,
Map<String, String> tags, Long start, Long end) throws Exception {
Map<String, String> tags, Long start, Long end, Map<String, Object> hint) throws Exception {
// fields
StringBuilder fieldBuilder = new StringBuilder();
fieldBuilder.append("\"fields\":[");
@ -177,10 +238,11 @@ final class TSDBDump {
" \"metric\": \"" + metric + "\",\n" +
fieldBuilder.toString() +
(tagKV == null ? "" : tagKV) +
(hint == null ? "" : (", \"hint\": " + JSON.toJSONString(hint))) +
" }\n" +
" ]\n" +
"}";
return HttpUtils.post(conn.address() + QUERY_MULTI_FIELD, body);
return HttpUtils.post(conn.address() + QUERY_MULTI_FIELD, conn.username(), conn.password(), body);
}
private static String getFilterByTags(Map<String, String> tags) {

View File

@ -1,11 +1,13 @@
package com.alibaba.datax.plugin.reader.tsdbreader.util;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.fluent.Content;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -22,13 +24,18 @@ public final class HttpUtils {
public final static int CONNECT_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
public final static int SOCKET_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
private static final String CREDENTIALS_FORMAT = "%s:%s";
private static final String BASIC_AUTHENTICATION_FORMAT = "Basic %s";
private HttpUtils() {
}
public static String get(String url) throws Exception {
Content content = Request.Get(url)
public static String get(String url, String username, String password) throws Exception {
final Request request = Request.Get(url)
.connectTimeout(CONNECT_TIMEOUT_DEFAULT_IN_MILL)
.socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL)
.socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL);
addAuth(request, username, password);
Content content = request
.execute()
.returnContent();
if (content == null) {
@ -37,24 +44,21 @@ public final class HttpUtils {
return content.asString(StandardCharsets.UTF_8);
}
public static String post(String url, Map<String, Object> params) throws Exception {
return post(url, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
public static String post(String url, String username, String password, Map<String, Object> params) throws Exception {
return post(url, username, password, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
}
public static String post(String url, String params) throws Exception {
return post(url, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
public static String post(String url, String username, String password, String params) throws Exception {
return post(url, username, password, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
}
public static String post(String url, Map<String, Object> params,
public static String post(String url, String username, String password, String params,
int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
return post(url, JSON.toJSONString(params), connectTimeoutInMill, socketTimeoutInMill);
}
public static String post(String url, String params,
int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
Content content = Request.Post(url)
Request request = Request.Post(url)
.connectTimeout(connectTimeoutInMill)
.socketTimeout(socketTimeoutInMill)
.socketTimeout(socketTimeoutInMill);
addAuth(request, username, password);
Content content = request
.addHeader("Content-Type", "application/json")
.bodyString(params, ContentType.APPLICATION_JSON)
.execute()
@ -64,4 +68,20 @@ public final class HttpUtils {
}
return content.asString(StandardCharsets.UTF_8);
}
private static void addAuth(Request request, String username, String password) {
String authorization = generateHttpAuthorization(username, password);
if (authorization != null) {
request.setHeader("Authorization", authorization);
}
}
private static String generateHttpAuthorization(String username, String password) {
if (StringUtils.isBlank(username) || StringUtils.isBlank(password)) {
return null;
}
String credentials = String.format(CREDENTIALS_FORMAT, username, password);
credentials = Base64.getEncoder().encodeToString(credentials.getBytes());
return String.format(BASIC_AUTHENTICATION_FORMAT, credentials);
}
}

View File

@ -1,11 +1,5 @@
package com.alibaba.datax.plugin.reader.tsdbreader.util;
import com.alibaba.datax.plugin.reader.tsdbreader.conn.DataPoint4TSDB;
import com.alibaba.fastjson.JSON;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
/**
* Copyright @ 2019 alibaba.com
@ -17,52 +11,28 @@ import java.util.List;
*/
public final class TSDBUtils {
private static final Logger LOGGER = LoggerFactory.getLogger(TSDBUtils.class);
private TSDBUtils() {
}
public static String version(String address) {
public static String version(String address, String username, String password) {
String url = String.format("%s/api/version", address);
String rsp;
try {
rsp = HttpUtils.get(url);
rsp = HttpUtils.get(url, username, password);
} catch (Exception e) {
throw new RuntimeException(e);
}
return rsp;
}
public static String config(String address) {
public static String config(String address, String username, String password) {
String url = String.format("%s/api/config", address);
String rsp;
try {
rsp = HttpUtils.get(url);
rsp = HttpUtils.get(url, username, password);
} catch (Exception e) {
throw new RuntimeException(e);
}
return rsp;
}
public static boolean put(String address, List<DataPoint4TSDB> dps) {
return put(address, JSON.toJSON(dps));
}
public static boolean put(String address, DataPoint4TSDB dp) {
return put(address, JSON.toJSON(dp));
}
private static boolean put(String address, Object o) {
String url = String.format("%s/api/put", address);
String rsp;
try {
rsp = HttpUtils.post(url, o.toString());
// If successful, the returned content should be null.
assert rsp == null;
} catch (Exception e) {
LOGGER.error("Address: {}, DataPoints: {}", url, o);
throw new RuntimeException(e);
}
return true;
}
}

View File

@ -91,6 +91,13 @@
<version>${fastjson.version}</version>
</dependency>
<!-- tsdb -->
<dependency>
<groupId>com.aliyun</groupId>
<artifactId>hitsdb-client</artifactId>
<version>0.4.0-SNAPSHOT</version>
</dependency>
<!-- test -->
<dependency>
<groupId>junit</groupId>

View File

@ -21,6 +21,28 @@ public interface Connection4TSDB {
*/
String address();
/**
* Get the setted database name.
*
* @return database
*/
String database();
/**
* Get the username of Database.
*
* @return username
*/
String username();
/**
* Get the password of Database.
*
* @return password
*/
String password();
/**
* Get the version of Database.
*
@ -69,17 +91,25 @@ public interface Connection4TSDB {
boolean put(List<DataPoint4TSDB> dps);
/**
* Put data points.
* Put data points with single field.
*
* @param dps data points
* @return whether the data point is written successfully
*/
boolean put(String dps);
/**
* Put data points with multi fields.
*
* @param dps data points
* @return whether the data point is written successfully
*/
boolean mput(String dps);
/**
* Whether current version is supported.
*
* @return true: supported; false: not yet!
*/
boolean isSupported();
}
}

View File

@ -18,12 +18,18 @@ import java.util.List;
public class TSDBConnection implements Connection4TSDB {
private String address;
private String username;
private String password;
private String database;
public TSDBConnection(String address) {
public TSDBConnection(String address, String database, String username, String password) {
if (StringUtils.isBlank(address)) {
throw new RuntimeException("TSDBConnection init failed because address is blank!");
}
this.address = address;
this.database = database;
this.username = username;
this.password = password;
}
@Override
@ -31,14 +37,29 @@ public class TSDBConnection implements Connection4TSDB {
return address;
}
@Override
public String username() {
return username;
}
@Override
public String database() {
return database;
}
@Override
public String password() {
return password;
}
@Override
public String version() {
return TSDBUtils.version(address);
return TSDBUtils.version(address, username, password);
}
@Override
public String config() {
return TSDBUtils.config(address);
return TSDBUtils.config(address, username, password);
}
@Override
@ -53,17 +74,22 @@ public class TSDBConnection implements Connection4TSDB {
@Override
public boolean put(DataPoint4TSDB dp) {
return TSDBUtils.put(address, dp);
return TSDBUtils.put(address, database, username, password, dp);
}
@Override
public boolean put(List<DataPoint4TSDB> dps) {
return TSDBUtils.put(address, dps);
return TSDBUtils.put(address, database, username, password, dps);
}
@Override
public boolean put(String dps) {
return TSDBUtils.put(address, dps);
return TSDBUtils.put(address, database, username, password, dps);
}
@Override
public boolean mput(String dps) {
return TSDBUtils.mput(address, database, username, password, dps);
}
@Override

View File

@ -10,8 +10,22 @@ package com.alibaba.datax.plugin.writer.tsdbwriter;
*/
public class Key {
static final String SOURCE_DB_TYPE = "sourceDbType";
static final String MULTI_FIELD = "multiField";
// common
static final String ENDPOINT = "endpoint";
static final String USERNAME = "username";
static final String PASSWORD = "password";
static final String IGNORE_WRITE_ERROR = "ignoreWriteError";
static final String DATABASE = "database";
// for tsdb
static final String BATCH_SIZE = "batchSize";
static final String MAX_RETRY_TIME = "maxRetryTime";
static final String IGNORE_WRITE_ERROR = "ignoreWriteError";
// for rdb
static final String COLUMN = "column";
static final String COLUMN_TYPE = "columnType";
static final String TABLE = "table";
}

View File

@ -0,0 +1,5 @@
package com.alibaba.datax.plugin.writer.tsdbwriter;
public enum SourceDBType {
TSDB, RDB
}

View File

@ -0,0 +1,96 @@
package com.alibaba.datax.plugin.writer.tsdbwriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.fastjson.JSON;
import com.aliyun.hitsdb.client.value.request.MultiFieldPoint;
import com.aliyun.hitsdb.client.value.request.Point;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
class TSDBConverter {
private static final Logger LOG = LoggerFactory.getLogger(TSDBConverter.class);
private List<String> columnName;
private List<String> columnType;
TSDBConverter(List<String> columnName, List<String> columnType) {
this.columnName = columnName;
this.columnType = columnType;
LOG.info("columnName: {}, columnType: {}", JSON.toJSONString(columnName), JSON.toJSONString(columnType));
}
List<Point> transRecord2Point(List<Record> records) {
List<Point> dps = new ArrayList<Point>();
for (Record record : records) {
List<Point.MetricBuilder> metricBuilders = new ArrayList<Point.MetricBuilder>();
Map<String, String> tags = new HashMap<String, String>();
Long time = 0L;
for (int i = 0; i < columnType.size(); i++) {
String type = columnType.get(i);
String name = columnName.get(i);
Column column = record.getColumn(i);
if (TSDBModel.TSDB_TAG.equals(type)) {
tags.put(name, column.asString());
} else if (TSDBModel.TSDB_FIELD_DOUBLE.equals(type)) {
metricBuilders.add(new Point.MetricBuilder(name).value(column.asDouble()));
} else if (TSDBModel.TSDB_FIELD_STRING.equals(type)) {
metricBuilders.add(new Point.MetricBuilder(name).value(column.asString()));
} else if (TSDBModel.TSDB_FIELD_BOOL.equals(type)) {
metricBuilders.add(new Point.MetricBuilder(name).value(column.asBoolean()));
} else if (TSDBModel.TSDB_TIMESTAMP.equals(type)) {
time = column.asLong();
} else if (TSDBModel.TSDB_METRIC_NUM.equals(type)) {
// compatible with previous usage of TSDB_METRIC_NUM
metricBuilders.add(new Point.MetricBuilder(name).value(column.asDouble()));
} else if (TSDBModel.TSDB_METRIC_STRING.equals(type)) {
// compatible with previous usage of TSDB_METRIC_STRING
metricBuilders.add(new Point.MetricBuilder(name).value(column.asString()));
}
}
for (Point.MetricBuilder metricBuilder : metricBuilders) {
dps.add(metricBuilder.tag(tags).timestamp(time).build(false));
}
}
return dps;
}
List<MultiFieldPoint> transRecord2MultiFieldPoint(List<Record> records, String tableName) {
List<MultiFieldPoint> dps = new ArrayList<MultiFieldPoint>();
for (Record record : records) {
MultiFieldPoint.MetricBuilder builder = MultiFieldPoint.metric(tableName);
for (int i = 0; i < columnType.size(); i++) {
String type = columnType.get(i);
String name = columnName.get(i);
Column column = record.getColumn(i);
if (TSDBModel.TSDB_TAG.equals(type)) {
builder.tag(name, column.asString());
} else if (TSDBModel.TSDB_FIELD_DOUBLE.equals(type)) {
builder.field(name, column.asDouble());
} else if (TSDBModel.TSDB_FIELD_STRING.equals(type)) {
builder.field(name, column.asString());
} else if (TSDBModel.TSDB_FIELD_BOOL.equals(type)) {
builder.field(name, column.asBoolean());
} else if (TSDBModel.TSDB_TIMESTAMP.equals(type)) {
builder.timestamp(column.asLong());
} else if (TSDBModel.TSDB_METRIC_NUM.equals(type)) {
// compatible with previous usage of TSDB_METRIC_NUM
builder.field(name, column.asDouble());
} else if (TSDBModel.TSDB_METRIC_STRING.equals(type)) {
// compatible with previous usage of TSDB_METRIC_STRING
builder.field(name, column.asString());
}
}
MultiFieldPoint point = builder.build(false);
dps.add(point);
}
return dps;
}
}

View File

@ -0,0 +1,11 @@
package com.alibaba.datax.plugin.writer.tsdbwriter;
class TSDBModel {
static final String TSDB_METRIC_NUM = "metric_num";
static final String TSDB_METRIC_STRING = "metric_string";
static final String TSDB_TAG = "tag";
static final String TSDB_TIMESTAMP = "timestamp";
static final String TSDB_FIELD_DOUBLE = "field_double";
static final String TSDB_FIELD_STRING = "field_string";
static final String TSDB_FIELD_BOOL = "field_bool";
}

View File

@ -7,12 +7,20 @@ import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.RetryUtil;
import com.alibaba.datax.plugin.writer.conn.TSDBConnection;
import com.aliyun.hitsdb.client.TSDB;
import com.aliyun.hitsdb.client.TSDBClientFactory;
import com.aliyun.hitsdb.client.TSDBConfig;
import com.aliyun.hitsdb.client.value.request.MultiFieldPoint;
import com.aliyun.hitsdb.client.value.request.Point;
import com.aliyun.hitsdb.client.value.response.batch.IgnoreErrorsResult;
import com.aliyun.hitsdb.client.value.response.batch.MultiFieldIgnoreErrorsResult;
import com.aliyun.hitsdb.client.value.response.batch.SummaryResult;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;
import java.io.IOException;
import java.util.*;
import java.util.concurrent.Callable;
/**
@ -26,6 +34,9 @@ import java.util.concurrent.Callable;
@SuppressWarnings("unused")
public class TSDBWriter extends Writer {
private static SourceDBType DB_TYPE;
private static TSDB tsdb = null;
public static class Job extends Writer.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
@ -34,33 +45,99 @@ public class TSDBWriter extends Writer {
@Override
public void init() {
this.originalConfig = super.getPluginJobConf();
originalConfig = super.getPluginJobConf();
String address = this.originalConfig.getString(Key.ENDPOINT);
if (StringUtils.isBlank(address)) {
// check source db type
String sourceDbType = originalConfig.getString(Key.SOURCE_DB_TYPE);
if (StringUtils.isBlank(sourceDbType)) {
sourceDbType = SourceDBType.TSDB.name();
originalConfig.set(Key.SOURCE_DB_TYPE, sourceDbType);
LOG.info("The parameter [" + Key.SOURCE_DB_TYPE + "] will be default value: " + SourceDBType.TSDB);
}
try {
DB_TYPE = SourceDBType.valueOf(sourceDbType);
} catch (Exception e) {
throw DataXException.asDataXException(TSDBWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.ENDPOINT + "] is not set.");
"The parameter [" + Key.SOURCE_DB_TYPE +
"] is invalid, which should be one of [" + Arrays.toString(SourceDBType.values()) + "].");
}
Integer batchSize = this.originalConfig.getInt(Key.BATCH_SIZE);
if (batchSize == null || batchSize < 1) {
originalConfig.set(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE);
LOG.info("The parameter [" + Key.BATCH_SIZE +
"] will be default value: " + Constant.DEFAULT_BATCH_SIZE);
}
// for tsdb
if (DB_TYPE == SourceDBType.TSDB) {
String address = originalConfig.getString(Key.ENDPOINT);
if (StringUtils.isBlank(address)) {
throw DataXException.asDataXException(TSDBWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.ENDPOINT + "] is not set.");
}
Integer retrySize = this.originalConfig.getInt(Key.MAX_RETRY_TIME);
if (retrySize == null || retrySize < 0) {
originalConfig.set(Key.MAX_RETRY_TIME, Constant.DEFAULT_TRY_SIZE);
LOG.info("The parameter [" + Key.MAX_RETRY_TIME +
"] will be default value: " + Constant.DEFAULT_TRY_SIZE);
}
String username = originalConfig.getString(Key.USERNAME, null);
if (StringUtils.isBlank(username)) {
LOG.warn("The parameter [" + Key.USERNAME + "] is blank.");
}
String password = originalConfig.getString(Key.PASSWORD, null);
if (StringUtils.isBlank(password)) {
LOG.warn("The parameter [" + Key.PASSWORD + "] is blank.");
}
Integer batchSize = originalConfig.getInt(Key.BATCH_SIZE);
if (batchSize == null || batchSize < 1) {
originalConfig.set(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE);
LOG.info("The parameter [" + Key.BATCH_SIZE +
"] will be default value: " + Constant.DEFAULT_BATCH_SIZE);
}
Integer retrySize = originalConfig.getInt(Key.MAX_RETRY_TIME);
if (retrySize == null || retrySize < 0) {
originalConfig.set(Key.MAX_RETRY_TIME, Constant.DEFAULT_TRY_SIZE);
LOG.info("The parameter [" + Key.MAX_RETRY_TIME +
"] will be default value: " + Constant.DEFAULT_TRY_SIZE);
}
Boolean ignoreWriteError = originalConfig.getBool(Key.IGNORE_WRITE_ERROR);
if (ignoreWriteError == null) {
originalConfig.set(Key.IGNORE_WRITE_ERROR, Constant.DEFAULT_IGNORE_WRITE_ERROR);
LOG.info("The parameter [" + Key.IGNORE_WRITE_ERROR +
"] will be default value: " + Constant.DEFAULT_IGNORE_WRITE_ERROR);
}
} else if (DB_TYPE == SourceDBType.RDB) {
// for rdb
originalConfig.getNecessaryValue(Key.ENDPOINT, TSDBWriterErrorCode.REQUIRED_VALUE);
originalConfig.getNecessaryValue(Key.COLUMN_TYPE, TSDBWriterErrorCode.REQUIRED_VALUE);
originalConfig.getNecessaryValue(Key.COLUMN, TSDBWriterErrorCode.REQUIRED_VALUE);
String endpoint = originalConfig.getString(Key.ENDPOINT);
String[] split = endpoint.split(":");
if (split.length != 3) {
throw DataXException.asDataXException(TSDBWriterErrorCode.REQUIRED_VALUE,
"The parameter [" + Key.ENDPOINT + "] is invalid, which should be [http://IP:Port].");
}
String ip = split[1].substring(2);
int port = Integer.parseInt(split[2]);
String username = originalConfig.getString(Key.USERNAME, null);
if (StringUtils.isBlank(username)) {
LOG.warn("The parameter [" + Key.USERNAME + "] is blank.");
}
String password = originalConfig.getString(Key.PASSWORD, null);
if (StringUtils.isBlank(password)) {
LOG.warn("The parameter [" + Key.PASSWORD + "] is blank.");
}
if (!StringUtils.isBlank(password) && !StringUtils.isBlank(username)) {
tsdb = TSDBClientFactory.connect(TSDBConfig.address(ip, port).basicAuth(username, password).config());
} else {
tsdb = TSDBClientFactory.connect(TSDBConfig.address(ip, port).config());
}
String database = originalConfig.getString(Key.DATABASE, null);
if (StringUtils.isBlank(database)) {
LOG.info("The parameter [" + Key.DATABASE + "] is blank.");
} else {
tsdb.useDatabase(database);
}
LOG.info("Tsdb config:" + originalConfig.toJSON());
Boolean ignoreWriteError = this.originalConfig.getBool(Key.IGNORE_WRITE_ERROR);
if (ignoreWriteError == null) {
originalConfig.set(Key.IGNORE_WRITE_ERROR, Constant.DEFAULT_IGNORE_WRITE_ERROR);
LOG.info("The parameter [" + Key.IGNORE_WRITE_ERROR +
"] will be default value: " + Constant.DEFAULT_IGNORE_WRITE_ERROR);
}
}
@ -72,7 +149,7 @@ public class TSDBWriter extends Writer {
public List<Configuration> split(int mandatoryNumber) {
ArrayList<Configuration> configurations = new ArrayList<Configuration>(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) {
configurations.add(this.originalConfig.clone());
configurations.add(originalConfig.clone());
}
return configurations;
}
@ -83,6 +160,14 @@ public class TSDBWriter extends Writer {
@Override
public void destroy() {
if (DB_TYPE == SourceDBType.RDB) {
if (tsdb != null) {
try {
tsdb.close();
} catch (IOException ignored) {
}
}
}
}
}
@ -91,18 +176,87 @@ public class TSDBWriter extends Writer {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private TSDBConnection conn;
private boolean multiField;
private int batchSize;
private int retrySize;
private boolean ignoreWriteError;
private String tableName;
private TSDBConverter tsdbConverter;
@Override
public void init() {
Configuration writerSliceConfig = getPluginJobConf();
String address = writerSliceConfig.getString(Key.ENDPOINT);
this.conn = new TSDBConnection(address);
this.batchSize = writerSliceConfig.getInt(Key.BATCH_SIZE);
this.retrySize = writerSliceConfig.getInt(Key.MAX_RETRY_TIME);
// single field | multi fields
this.multiField = writerSliceConfig.getBool(Key.MULTI_FIELD, false);
this.ignoreWriteError = writerSliceConfig.getBool(Key.IGNORE_WRITE_ERROR);
// for tsdb
if (DB_TYPE == SourceDBType.TSDB) {
String address = writerSliceConfig.getString(Key.ENDPOINT);
String database = writerSliceConfig.getString(Key.DATABASE);
String username = writerSliceConfig.getString(Key.USERNAME);
String password = writerSliceConfig.getString(Key.PASSWORD);
this.conn = new TSDBConnection(address, database, username, password);
this.batchSize = writerSliceConfig.getInt(Key.BATCH_SIZE);
this.retrySize = writerSliceConfig.getInt(Key.MAX_RETRY_TIME);
} else if (DB_TYPE == SourceDBType.RDB) {
// for rdb
int timeSize = 0;
int fieldSize = 0;
int tagSize = 0;
batchSize = writerSliceConfig.getInt(Key.BATCH_SIZE, 100);
List<String> columnName = writerSliceConfig.getList(Key.COLUMN, String.class);
List<String> columnType = writerSliceConfig.getList(Key.COLUMN_TYPE, String.class);
Set<String> typeSet = new HashSet<String>(columnType);
if (columnName.size() != columnType.size()) {
throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.COLUMN_TYPE + "] should has same length with [" + Key.COLUMN + "].");
}
for (String type : columnType) {
if (TSDBModel.TSDB_TAG.equals(type)) {
tagSize ++;
} else if (TSDBModel.TSDB_FIELD_DOUBLE.equals(type) || TSDBModel.TSDB_FIELD_STRING.equals(type)
|| TSDBModel.TSDB_FIELD_BOOL.equals(type)) {
fieldSize++;
} else if (TSDBModel.TSDB_TIMESTAMP.equals(type)) {
timeSize++;
}
}
if (fieldSize == 0) {
// compatible with previous usage of TSDB_METRIC_NUM and TSDB_METRIC_STRING
if (!typeSet.contains(TSDBModel.TSDB_METRIC_NUM) && !typeSet.contains(TSDBModel.TSDB_METRIC_STRING)) {
throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.COLUMN_TYPE + "] is invalid, must set at least one of "
+ TSDBModel.TSDB_FIELD_DOUBLE + ", " + TSDBModel.TSDB_FIELD_STRING + " or " + TSDBModel.TSDB_FIELD_BOOL + ".");
}
}
if (tagSize == 0) {
throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.COLUMN_TYPE + "] is invalid, must set " + TSDBModel.TSDB_TAG + ". ");
}
if (timeSize != 1) {
throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.COLUMN_TYPE + "] is invalid, must set one and only one "
+ TSDBModel.TSDB_TIMESTAMP + ".");
}
if (multiField) {
// check source db type
tableName = writerSliceConfig.getString(Key.TABLE);
if (StringUtils.isBlank(tableName)) {
throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.TABLE + "] h must set when use multi field input.");
}
}
tsdbConverter = new TSDBConverter(columnName, columnType);
}
}
@Override
@ -111,30 +265,52 @@ public class TSDBWriter extends Writer {
@Override
public void startWrite(RecordReceiver recordReceiver) {
try {
Record lastRecord = null;
Record record;
int count = 0;
StringBuilder dps = new StringBuilder();
while ((record = recordReceiver.getFromReader()) != null) {
final int recordLength = record.getColumnNumber();
for (int i = 0; i < recordLength; i++) {
dps.append(record.getColumn(i).asString());
dps.append(",");
count++;
if (count == batchSize) {
count = 0;
batchPut(record, "[" + dps.substring(0, dps.length() - 1) + "]");
dps = new StringBuilder();
// for tsdb
if (DB_TYPE == SourceDBType.TSDB) {
try {
Record lastRecord = null;
Record record;
int count = 0;
StringBuilder dps = new StringBuilder();
while ((record = recordReceiver.getFromReader()) != null) {
final int recordLength = record.getColumnNumber();
for (int i = 0; i < recordLength; i++) {
dps.append(record.getColumn(i).asString());
dps.append(",");
count++;
if (count == batchSize) {
count = 0;
batchPut(record, "[" + dps.substring(0, dps.length() - 1) + "]");
dps = new StringBuilder();
}
}
lastRecord = record;
}
lastRecord = record;
if (StringUtils.isNotBlank(dps.toString())) {
batchPut(lastRecord, "[" + dps.substring(0, dps.length() - 1) + "]");
}
} catch (Exception e) {
throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION, e);
}
if (StringUtils.isNotBlank(dps.toString())) {
batchPut(lastRecord, "[" + dps.substring(0, dps.length() - 1) + "]");
} else if (DB_TYPE == SourceDBType.RDB) {
// for rdb
List<Record> writerBuffer = new ArrayList<Record>(this.batchSize);
Record record;
long total = 0;
while ((record = recordReceiver.getFromReader()) != null) {
writerBuffer.add(record);
if (writerBuffer.size() >= this.batchSize) {
total += doBatchInsert(writerBuffer);
writerBuffer.clear();
}
}
} catch (Exception e) {
throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION, e);
if (!writerBuffer.isEmpty()) {
total += doBatchInsert(writerBuffer);
writerBuffer.clear();
}
getTaskPluginCollector().collectMessage("write size", total + "");
LOG.info("Task finished, write size: {}", total);
}
}
@ -143,12 +319,13 @@ public class TSDBWriter extends Writer {
RetryUtil.executeWithRetry(new Callable<Integer>() {
@Override
public Integer call() {
if (!conn.put(dps)) {
getTaskPluginCollector().collectDirtyRecord(record, "Put data points failed!");
throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION,
"Put data points failed!");
final boolean success = multiField ? conn.mput(dps) : conn.put(dps);
if (success) {
return 0;
}
return 0;
getTaskPluginCollector().collectDirtyRecord(record, "Put data points failed!");
throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION,
"Put data points failed!");
}
}, retrySize, 60000L, true);
} catch (Exception e) {
@ -160,6 +337,47 @@ public class TSDBWriter extends Writer {
}
}
private long doBatchInsert(final List<Record> writerBuffer) {
int size;
if (ignoreWriteError) {
if (multiField) {
List<MultiFieldPoint> points = tsdbConverter.transRecord2MultiFieldPoint(writerBuffer, tableName);
size = points.size();
MultiFieldIgnoreErrorsResult ignoreErrorsResult = tsdb.multiFieldPutSync(points, MultiFieldIgnoreErrorsResult.class);
if (ignoreErrorsResult == null) {
LOG.error("Unexpected inner error for insert");
} else if (ignoreErrorsResult.getFailed() > 0) {
LOG.error("write TSDB failed num:" + ignoreErrorsResult.getFailed());
}
} else {
List<Point> points = tsdbConverter.transRecord2Point(writerBuffer);
size = points.size();
IgnoreErrorsResult ignoreErrorsResult = tsdb.putSync(points, IgnoreErrorsResult.class);
if (ignoreErrorsResult == null) {
LOG.error("Unexpected inner error for insert");
} else if (ignoreErrorsResult.getFailed() > 0) {
LOG.error("write TSDB failed num:" + ignoreErrorsResult.getFailed());
}
}
} else {
SummaryResult summaryResult;
if (multiField) {
List<MultiFieldPoint> points = tsdbConverter.transRecord2MultiFieldPoint(writerBuffer, tableName);
size = points.size();
summaryResult = tsdb.multiFieldPutSync(points, SummaryResult.class);
} else {
List<Point> points = tsdbConverter.transRecord2Point(writerBuffer);
size = points.size();
summaryResult = tsdb.putSync(points, SummaryResult.class);
}
if (summaryResult.getFailed() > 0) {
LOG.error("write TSDB failed num:" + summaryResult.getFailed());
throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION, "Write TSDB failed", new Exception());
}
}
return size;
}
@Override
public void post() {
}

View File

@ -13,6 +13,7 @@ import com.alibaba.datax.common.spi.ErrorCode;
public enum TSDBWriterErrorCode implements ErrorCode {
REQUIRED_VALUE("TSDBWriter-00", "Missing the necessary value"),
ILLEGAL_VALUE("TSDBWriter-01", "Illegal value"),
RUNTIME_EXCEPTION("TSDBWriter-01", "Runtime exception"),
RETRY_WRITER_EXCEPTION("TSDBWriter-02", "After repeated attempts, the write still fails");

View File

@ -1,11 +1,14 @@
package com.alibaba.datax.plugin.writer.util;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.fluent.Content;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.Base64;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@ -19,43 +22,44 @@ import java.util.concurrent.TimeUnit;
*/
public final class HttpUtils {
public final static Charset UTF_8 = Charset.forName("UTF-8");
public final static int CONNECT_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
public final static int SOCKET_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
private static final String CREDENTIALS_FORMAT = "%s:%s";
private static final String BASIC_AUTHENTICATION_FORMAT = "Basic %s";
private HttpUtils() {
}
public static String get(String url) throws Exception {
Content content = Request.Get(url)
public static String get(String url, String username, String password) throws Exception {
final Request request = Request.Get(url)
.connectTimeout(CONNECT_TIMEOUT_DEFAULT_IN_MILL)
.socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL)
.socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL);
addAuth(request, username, password);
Content content = request
.execute()
.returnContent();
if (content == null) {
return null;
}
return content.asString(UTF_8);
return content.asString(StandardCharsets.UTF_8);
}
public static String post(String url, Map<String, Object> params) throws Exception {
return post(url, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
public static String post(String url, String username, String password, Map<String, Object> params) throws Exception {
return post(url, username, password, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
}
public static String post(String url, String params) throws Exception {
return post(url, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
public static String post(String url, String username, String password, String params) throws Exception {
return post(url, username, password, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
}
public static String post(String url, Map<String, Object> params,
public static String post(String url, String username, String password, String params,
int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
return post(url, JSON.toJSONString(params), connectTimeoutInMill, socketTimeoutInMill);
}
public static String post(String url, String params,
int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
Content content = Request.Post(url)
Request request = Request.Post(url)
.connectTimeout(connectTimeoutInMill)
.socketTimeout(socketTimeoutInMill)
.socketTimeout(socketTimeoutInMill);
addAuth(request, username, password);
Content content = request
.addHeader("Content-Type", "application/json")
.bodyString(params, ContentType.APPLICATION_JSON)
.execute()
@ -63,6 +67,22 @@ public final class HttpUtils {
if (content == null) {
return null;
}
return content.asString(UTF_8);
return content.asString(StandardCharsets.UTF_8);
}
private static void addAuth(Request request, String username, String password) {
String authorization = generateHttpAuthorization(username, password);
if (authorization != null) {
request.setHeader("Authorization", authorization);
}
}
private static String generateHttpAuthorization(String username, String password) {
if (StringUtils.isBlank(username) || StringUtils.isBlank(password)) {
return null;
}
String credentials = String.format(CREDENTIALS_FORMAT, username, password);
credentials = Base64.getEncoder().encodeToString(credentials.getBytes());
return String.format(BASIC_AUTHENTICATION_FORMAT, credentials);
}
}

View File

@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.util;
import com.alibaba.datax.plugin.writer.conn.DataPoint4TSDB;
import com.alibaba.fastjson.JSON;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -22,45 +23,56 @@ public final class TSDBUtils {
private TSDBUtils() {
}
public static String version(String address) {
public static String version(String address, String username, String password) {
String url = String.format("%s/api/version", address);
String rsp;
try {
rsp = HttpUtils.get(url);
rsp = HttpUtils.get(url, username, password);
} catch (Exception e) {
throw new RuntimeException(e);
}
return rsp;
}
public static String config(String address) {
public static String config(String address, String username, String password) {
String url = String.format("%s/api/config", address);
String rsp;
try {
rsp = HttpUtils.get(url);
rsp = HttpUtils.get(url, username, password);
} catch (Exception e) {
throw new RuntimeException(e);
}
return rsp;
}
public static boolean put(String address, List<DataPoint4TSDB> dps) {
return put(address, JSON.toJSON(dps));
public static boolean put(String address, String database, String username, String password, List<DataPoint4TSDB> dps) {
return put(address, database, username, password, JSON.toJSON(dps));
}
public static boolean put(String address, DataPoint4TSDB dp) {
return put(address, JSON.toJSON(dp));
public static boolean put(String address, String database, String username, String password, DataPoint4TSDB dp) {
return put(address, database, username, password, JSON.toJSON(dp));
}
private static boolean put(String address, Object o) {
return put(address, o.toString());
private static boolean put(String address, String database, String username, String password, Object o) {
return put(address, database, username, password, o.toString());
}
public static boolean put(String address, String s) {
String url = String.format("%s/api/put", address);
public static boolean put(String address, String database, String username, String password, String s) {
return put(address, database, username, password, s, false);
}
public static boolean mput(String address, String database, String username, String password, String s) {
return put(address, database, username, password, s, true);
}
public static boolean put(String address, String database, String username, String password, String s, boolean multiField) {
String url = address + (multiField ? "/api/mput" : "/api/put");
if (!StringUtils.isBlank(database)) {
url = url.concat("?db=" + database);
}
String rsp;
try {
rsp = HttpUtils.post(url, s);
rsp = HttpUtils.post(url, username, password, s);
// If successful, the returned content should be null.
assert rsp == null;
} catch (Exception e) {