tables = connConfig.getList(Key.TABLE, String.class);
+ ObReaderUtils.escapeDatabaseKeywords(tables);
+ originalConfig.set(String.format("%s[%d].%s", com.alibaba.datax.plugin.rdbms.reader.Constant.CONN_MARK, i, Key.TABLE), tables);
+ }
+ super.init(originalConfig);
}
@Override
diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java
index 073bb3cb..a43dcebd 100644
--- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java
+++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/ext/ReaderTask.java
@@ -1,13 +1,5 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.ext;
-import java.sql.*;
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.plugin.RecordSender;
@@ -19,11 +11,17 @@ import com.alibaba.datax.plugin.rdbms.reader.CommonRdbmsReader;
import com.alibaba.datax.plugin.rdbms.reader.Constant;
import com.alibaba.datax.plugin.rdbms.reader.Key;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
-import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.datax.plugin.rdbms.util.RdbmsException;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.Config;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.ObReaderUtils;
import com.alibaba.datax.plugin.reader.oceanbasev10reader.util.TaskContext;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.util.ArrayList;
+import java.util.List;
public class ReaderTask extends CommonRdbmsReader.Task {
private static final Logger LOG = LoggerFactory.getLogger(ReaderTask.class);
@@ -41,11 +39,12 @@ public class ReaderTask extends CommonRdbmsReader.Task {
private boolean reuseConn = false;
public ReaderTask(int taskGroupId, int taskId) {
- super(ObReaderUtils.DATABASE_TYPE, taskGroupId, taskId);
+ super(ObReaderUtils.databaseType, taskGroupId, taskId);
this.taskGroupId = taskGroupId;
this.taskId = taskId;
}
+ @Override
public void init(Configuration readerSliceConfig) {
/* for database connection */
username = readerSliceConfig.getString(Key.USERNAME);
@@ -54,7 +53,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
queryTimeoutSeconds = readerSliceConfig.getInt(Config.QUERY_TIMEOUT_SECOND,
Config.DEFAULT_QUERY_TIMEOUT_SECOND);
// ob10的处理
- if(jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) {
+ if (jdbcUrl.startsWith(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING)) {
String[] ss = jdbcUrl.split(com.alibaba.datax.plugin.rdbms.writer.Constant.OB10_SPLIT_STRING_PATTERN);
if (ss.length == 3) {
LOG.info("this is ob1_0 jdbc url.");
@@ -63,16 +62,14 @@ public class ReaderTask extends CommonRdbmsReader.Task {
}
}
- if (ObReaderUtils.DATABASE_TYPE == DataBaseType.OceanBase) {
- jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
+ jdbcUrl = jdbcUrl.replace("jdbc:mysql:", "jdbc:oceanbase:") + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
+ if (ObReaderUtils.compatibleMode.equals(ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE)) {
compatibleMode = ObReaderUtils.OB_COMPATIBLE_MODE_ORACLE;
- } else {
- jdbcUrl = jdbcUrl + "&socketTimeout=1800000&connectTimeout=60000"; //socketTimeout 半个小时
}
LOG.info("this is ob1_0 jdbc url. user=" + username + " :url=" + jdbcUrl);
mandatoryEncoding = readerSliceConfig.getString(Key.MANDATORY_ENCODING, "");
retryLimit = readerSliceConfig.getInt(Config.RETRY_LIMIT, Config.DEFAULT_RETRY_LIMIT);
- LOG.info("retryLimit: "+ retryLimit);
+ LOG.info("retryLimit: " + retryLimit);
}
private void buildSavePoint(TaskContext context) {
@@ -83,7 +80,6 @@ public class ReaderTask extends CommonRdbmsReader.Task {
}
/**
- *
* 如果isTableMode && table有PK
*
* 则支持断点续读 (若pk不在原始的columns中,则追加到尾部,但不传给下游)
@@ -92,7 +88,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
*/
@Override
public void startRead(Configuration readerSliceConfig, RecordSender recordSender,
- TaskPluginCollector taskPluginCollector, int fetchSize) {
+ TaskPluginCollector taskPluginCollector, int fetchSize) {
String querySql = readerSliceConfig.getString(Key.QUERY_SQL);
String table = readerSliceConfig.getString(Key.TABLE);
PerfTrace.getInstance().addTaskDetails(taskId, table + "," + jdbcUrl);
@@ -131,14 +127,14 @@ public class ReaderTask extends CommonRdbmsReader.Task {
}
private void startRead0(boolean isTableMode, TaskContext context, RecordSender recordSender,
- TaskPluginCollector taskPluginCollector) {
+ TaskPluginCollector taskPluginCollector) {
// 不是table模式 直接使用原来的做法
if (!isTableMode) {
doRead(recordSender, taskPluginCollector, context);
return;
}
// check primary key index
- Connection conn = DBUtil.getConnection(ObReaderUtils.DATABASE_TYPE, jdbcUrl, username, password);
+ Connection conn = DBUtil.getConnection(ObReaderUtils.databaseType, jdbcUrl, username, password);
ObReaderUtils.initConn4Reader(conn, queryTimeoutSeconds);
context.setConn(conn);
try {
@@ -184,11 +180,11 @@ public class ReaderTask extends CommonRdbmsReader.Task {
}
} catch (Throwable e) {
if (retryLimit == ++retryCount) {
- throw RdbmsException.asQueryException(ObReaderUtils.DATABASE_TYPE, new Exception(e),
+ throw RdbmsException.asQueryException(ObReaderUtils.databaseType, new Exception(e),
context.getQuerySql(), context.getTable(), username);
}
LOG.error("read fail, retry count " + retryCount + ", sleep 60 second, save point:" +
- context.getSavePoint() + ", error: "+ e.getMessage());
+ context.getSavePoint() + ", error: " + e.getMessage());
ObReaderUtils.sleep(60000); // sleep 10s
}
// 假如原来的查询有查出数据,则改成增量查询
@@ -227,7 +223,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
LOG.info("connection is alive, will reuse this connection.");
} else {
LOG.info("Create new connection for reader.");
- conn = DBUtil.getConnection(ObReaderUtils.DATABASE_TYPE, jdbcUrl, username, password);
+ conn = DBUtil.getConnection(ObReaderUtils.databaseType, jdbcUrl, username, password);
ObReaderUtils.initConn4Reader(conn, queryTimeoutSeconds);
context.setConn(conn);
}
@@ -287,7 +283,7 @@ public class ReaderTask extends CommonRdbmsReader.Task {
ObReaderUtils.close(null, null, context.getConn());
context.setConn(null);
LOG.error("reader data fail", e);
- throw RdbmsException.asQueryException(ObReaderUtils.DATABASE_TYPE, e, context.getQuerySql(),
+ throw RdbmsException.asQueryException(ObReaderUtils.databaseType, e, context.getQuerySql(),
context.getTable(), username);
} finally {
perfRecord.end();
diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java
index 2290fb43..20c2f922 100644
--- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java
+++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/ObReaderUtils.java
@@ -1,52 +1,71 @@
package com.alibaba.datax.plugin.reader.oceanbasev10reader.util;
-import java.sql.Connection;
-import java.sql.PreparedStatement;
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Statement;
-import java.sql.Timestamp;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.TreeMap;
-import java.util.regex.Matcher;
-import java.util.regex.Pattern;
-import java.util.Set;
-import java.util.TreeSet;
-
-import org.apache.commons.lang3.ArrayUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.datax.common.element.BoolColumn;
-import com.alibaba.datax.common.element.BytesColumn;
-import com.alibaba.datax.common.element.Column;
-import com.alibaba.datax.common.element.DateColumn;
-import com.alibaba.datax.common.element.DoubleColumn;
-import com.alibaba.datax.common.element.LongColumn;
-import com.alibaba.datax.common.element.Record;
-import com.alibaba.datax.common.element.StringColumn;
+import com.alibaba.datax.common.element.*;
import com.alibaba.datax.plugin.rdbms.util.DBUtil;
import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
import com.alibaba.druid.sql.SQLUtils;
import com.alibaba.druid.sql.ast.SQLExpr;
import com.alibaba.druid.sql.ast.expr.SQLBinaryOpExpr;
import com.alibaba.druid.sql.ast.expr.SQLBinaryOperator;
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.*;
+import java.util.*;
+import java.util.Map.Entry;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
public class ObReaderUtils {
private static final Logger LOG = LoggerFactory.getLogger(ObReaderUtils.class);
+ private static final String MYSQL_KEYWORDS = "ACCESSIBLE,ACCOUNT,ACTION,ADD,AFTER,AGAINST,AGGREGATE,ALGORITHM,ALL,ALTER,ALWAYS,ANALYSE,AND,ANY,AS,ASC,ASCII,ASENSITIVE,AT,AUTO_INCREMENT,AUTOEXTEND_SIZE,AVG,AVG_ROW_LENGTH,BACKUP,BEFORE,BEGIN,BETWEEN,BIGINT,BINARY,BINLOG,BIT,BLOB,BLOCK,BOOL,BOOLEAN,BOTH,BTREE,BY,BYTE,CACHE,CALL,CASCADE,CASCADED,CASE,CATALOG_NAME,CHAIN,CHANGE,CHANGED,CHANNEL,CHAR,CHARACTER,CHARSET,CHECK,CHECKSUM,CIPHER,CLASS_ORIGIN,CLIENT,CLOSE,COALESCE,CODE,COLLATE,COLLATION,COLUMN,COLUMN_FORMAT,COLUMN_NAME,COLUMNS,COMMENT,COMMIT,COMMITTED,COMPACT,COMPLETION,COMPRESSED,COMPRESSION,CONCURRENT,CONDITION,CONNECTION,CONSISTENT,CONSTRAINT,CONSTRAINT_CATALOG,CONSTRAINT_NAME,CONSTRAINT_SCHEMA,CONTAINS,CONTEXT,CONTINUE,CONVERT,CPU,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,CURSOR_NAME,DATA,DATABASE,DATABASES,DATAFILE,DATE,DATETIME,DAY,DAY_HOUR,DAY_MICROSECOND,DAY_MINUTE,DAY_SECOND,DEALLOCATE,DEC,DECIMAL,DECLARE,DEFAULT,DEFAULT_AUTH,DEFINER,DELAY_KEY_WRITE,DELAYED,DELETE,DES_KEY_FILE,DESC,DESCRIBE,DETERMINISTIC,DIAGNOSTICS,DIRECTORY,DISABLE,DISCARD,DISK,DISTINCT,DISTINCTROW,DIV,DO,DOUBLE,DROP,DUAL,DUMPFILE,DUPLICATE,DYNAMIC,EACH,ELSE,ELSEIF,ENABLE,ENCLOSED,ENCRYPTION,END,ENDS,ENGINE,ENGINES,ENUM,ERROR,ERRORS,ESCAPE,ESCAPED,EVENT,EVENTS,EVERY,EXCHANGE,EXECUTE,EXISTS,EXIT,EXPANSION,EXPIRE,EXPLAIN,EXPORT,EXTENDED,EXTENT_SIZE,FAST,FAULTS,FETCH,FIELDS,FILE,FILE_BLOCK_SIZE,FILTER,FIRST,FIXED,FLOAT,FLOAT4,FLOAT8,FLUSH,FOLLOWS,FOR,FORCE,FOREIGN,FORMAT,FOUND,FROM,FULL,FULLTEXT,FUNCTION,GENERAL,GENERATED,GEOMETRY,GEOMETRYCOLLECTION,GET,GET_FORMAT,GLOBAL,GRANT,GRANTS,GROUP,GROUP_REPLICATION,HANDLER,HASH,HAVING,HELP,HIGH_PRIORITY,HOST,HOSTS,HOUR,HOUR_MICROSECOND,HOUR_MINUTE,HOUR_SECOND,IDENTIFIED,IF,IGNORE,IGNORE_SERVER_IDS,IMPORT,IN,INDEX,INDEXES,INFILE,INITIAL_SIZE,INNER,INOUT,INSENSITIVE,INSERT,INSERT_METHOD,INSTALL,INSTANCE,INT,INT1,INT2,INT3,INT4,INT8,INTEGER,INTERVAL,INTO,INVOKER,IO,IO_AFTER_GTIDS,IO_BEFORE_GTIDS,IO_THREAD,IPC,IS,ISOLATION,ISSUER,ITERATE,JOIN,JSON,KEY,KEY_BLOCK_SIZE,KEYS,KILL,LANGUAGE,LAST,LEADING,LEAVE,LEAVES,LEFT,LESS,LEVEL,LIKE,LIMIT,LINEAR,LINES,LINESTRING,LIST,LOAD,LOCAL,LOCALTIME,LOCALTIMESTAMP,LOCK,LOCKS,LOGFILE,LOGS,LONG,LONGBLOB,LONGTEXT,LOOP,LOW_PRIORITY,MASTER,MASTER_AUTO_POSITION,MASTER_BIND,MASTER_CONNECT_RETRY,MASTER_DELAY,MASTER_HEARTBEAT_PERIOD,MASTER_HOST,MASTER_LOG_FILE,MASTER_LOG_POS,MASTER_PASSWORD,MASTER_PORT,MASTER_RETRY_COUNT,MASTER_SERVER_ID,MASTER_SSL,MASTER_SSL_CA,MASTER_SSL_CAPATH,MASTER_SSL_CERT,MASTER_SSL_CIPHER,MASTER_SSL_CRL,MASTER_SSL_CRLPATH,MASTER_SSL_KEY,MASTER_SSL_VERIFY_SERVER_CERT,MASTER_TLS_VERSION,MASTER_USER,MATCH,MAX_CONNECTIONS_PER_HOUR,MAX_QUERIES_PER_HOUR,MAX_ROWS,MAX_SIZE,MAX_STATEMENT_TIME,MAX_UPDATES_PER_HOUR,MAX_USER_CONNECTIONS,MAXVALUE,MEDIUM,MEDIUMBLOB,MEDIUMINT,MEDIUMTEXT,MEMORY,MERGE,MESSAGE_TEXT,MICROSECOND,MIDDLEINT,MIGRATE,MIN_ROWS,MINUTE,MINUTE_MICROSECOND,MINUTE_SECOND,MOD,MODE,MODIFIES,MODIFY,MONTH,MULTILINESTRING,MULTIPOINT,MULTIPOLYGON,MUTEX,MYSQL_ERRNO,NAME,NAMES,NATIONAL,NATURAL,NCHAR,NDB,NDBCLUSTER,NEVER,NEW,NEXT,NO,NO_WAIT,NO_WRITE_TO_BINLOG,NODEGROUP,NONBLOCKING,NONE,NOT,NULL,NUMBER,NUMERIC,NVARCHAR,OFFSET,OLD_PASSWORD,ON,ONE,ONLY,OPEN,OPTIMIZE,OPTIMIZER_COSTS,OPTION,OPTIONALLY,OPTIONS,OR,ORDER,OUT,OUTER,OUTFILE,OWNER,PACK_KEYS,PAGE,PARSE_GCOL_EXPR,PARSER,PARTIAL,PARTITION,PARTITIONING,PARTITIONS,PASSWORD,PHASE,PLUGIN,PLUGIN_DIR,PLUGINS,POINT,POLYGON,PORT,PRECEDES,PRECISION,PREPARE,PRESERVE,PREV,PRIMARY,PRIVILEGES,PROCEDURE,PROCESSLIST,PROFILE,PROFILES,PROXY,PURGE,QUARTER,QUERY,QUICK,RANGE,READ,READ_ONLY,READ_WRITE,READS,REAL,REBUILD,RECOVER,REDO_BUFFER_SIZE,REDOFILE,REDUNDANT,REFERENCES,REGEXP,RELAY,RELAY_LOG_FILE,RELAY_LOG_POS,RELAY_THREAD,RELAYLOG,RELEASE,RELOAD,REMOVE,RENAME,REORGANIZE,REPAIR,REPEAT,REPEATABLE,REPLACE,REPLICATE_DO_DB,REPLICATE_DO_TABLE,REPLICATE_IGNORE_DB,REPLICATE_IGNORE_TABLE,REPLICATE_REWRITE_DB,REPLICATE_WILD_DO_TABLE,REPLICATE_WILD_IGNORE_TABLE,REPLICATION,REQUIRE,RESET,RESIGNAL,RESTORE,RESTRICT,RESUME,RETURN,RETURNED_SQLSTATE,RETURNS,REVERSE,REVOKE,RIGHT,RLIKE,ROLLBACK,ROLLUP,ROTATE,ROUTINE,ROW,ROW_COUNT,ROW_FORMAT,ROWS,RTREE,SAVEPOINT,SCHEDULE,SCHEMA,SCHEMA_NAME,SCHEMAS,SECOND,SECOND_MICROSECOND,SECURITY,SELECT,SENSITIVE,SEPARATOR,SERIAL,SERIALIZABLE,SERVER,SESSION,SET,SHARE,SHOW,SHUTDOWN,SIGNAL,SIGNED,SIMPLE,SLAVE,SLOW,SMALLINT,SNAPSHOT,SOCKET,SOME,SONAME,SOUNDS,SOURCE,SPATIAL,SPECIFIC,SQL,SQL_AFTER_GTIDS,SQL_AFTER_MTS_GAPS,SQL_BEFORE_GTIDS,SQL_BIG_RESULT,SQL_BUFFER_RESULT,SQL_CACHE,SQL_CALC_FOUND_ROWS,SQL_NO_CACHE,SQL_SMALL_RESULT,SQL_THREAD,SQL_TSI_DAY,SQL_TSI_HOUR,SQL_TSI_MINUTE,SQL_TSI_MONTH,SQL_TSI_QUARTER,SQL_TSI_SECOND,SQL_TSI_WEEK,SQL_TSI_YEAR,SQLEXCEPTION,SQLSTATE,SQLWARNING,SSL,STACKED,START,STARTING,STARTS,STATS_AUTO_RECALC,STATS_PERSISTENT,STATS_SAMPLE_PAGES,STATUS,STOP,STORAGE,STORED,STRAIGHT_JOIN,STRING,SUBCLASS_ORIGIN,SUBJECT,SUBPARTITION,SUBPARTITIONS,SUPER,SUSPEND,SWAPS,SWITCHES,TABLE,TABLE_CHECKSUM,TABLE_NAME,TABLES,TABLESPACE,TEMPORARY,TEMPTABLE,TERMINATED,TEXT,THAN,THEN,TIME,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TINYBLOB,TINYINT,TINYTEXT,TO,TRAILING,TRANSACTION,TRIGGER,TRIGGERS,TRUNCATE,TYPE,TYPES,UNCOMMITTED,UNDEFINED,UNDO,UNDO_BUFFER_SIZE,UNDOFILE,UNICODE,UNINSTALL,UNION,UNIQUE,UNKNOWN,UNLOCK,UNSIGNED,UNTIL,UPDATE,UPGRADE,USAGE,USE,USE_FRM,USER,USER_RESOURCES,USING,UTC_DATE,UTC_TIME,UTC_TIMESTAMP,VALIDATION,VALUE,VALUES,VARBINARY,VARCHAR,VARCHARACTER,VARIABLES,VARYING,VIEW,VIRTUAL,WAIT,WARNINGS,WEEK,WEIGHT_STRING,WHEN,WHERE,WHILE,WITH,WITHOUT,WORK,WRAPPER,WRITE,X509,XA,XID,XML,XOR,YEAR,YEAR_MONTH,ZEROFILL,FALSE,TRUE";
+ private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NULL,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH";
+ private static Set databaseKeywords;
final static public String OB_COMPATIBLE_MODE = "obCompatibilityMode";
final static public String OB_COMPATIBLE_MODE_ORACLE = "ORACLE";
final static public String OB_COMPATIBLE_MODE_MYSQL = "MYSQL";
- public static DataBaseType DATABASE_TYPE = DataBaseType.MySql;
+ public static String compatibleMode = OB_COMPATIBLE_MODE_MYSQL;
+
+ public static final DataBaseType databaseType = DataBaseType.OceanBase;
+
+
+ private static Set keywordsFromString2HashSet(final String keywords) {
+ return new HashSet(Arrays.asList(keywords.split(",")));
+ }
+
+ public static String escapeDatabaseKeywords(String keyword) {
+ if (databaseKeywords == null) {
+ if (isOracleMode(compatibleMode)) {
+ databaseKeywords = keywordsFromString2HashSet(ORACLE_KEYWORDS);
+ } else {
+ databaseKeywords = keywordsFromString2HashSet(MYSQL_KEYWORDS);
+ }
+ }
+ char escapeChar = isOracleMode(compatibleMode) ? '"' : '`';
+ if (databaseKeywords.contains(keyword.toUpperCase())) {
+ keyword = escapeChar + keyword + escapeChar;
+ }
+ return keyword;
+ }
+
+ public static void escapeDatabaseKeywords(List keywords) {
+ for (int i = 0; i < keywords.size(); i++) {
+ keywords.set(i, escapeDatabaseKeywords(keywords.get(i)));
+ }
+ }
+
+ public static Boolean isEscapeMode(String keyword) {
+ if (isOracleMode(compatibleMode)) {
+ return keyword.startsWith("\"") && keyword.endsWith("\"");
+ } else {
+ return keyword.startsWith("`") && keyword.endsWith("`");
+ }
+ }
public static void initConn4Reader(Connection conn, long queryTimeoutSeconds) {
String setQueryTimeout = "set ob_query_timeout=" + (queryTimeoutSeconds * 1000 * 1000L);
@@ -57,7 +76,7 @@ public class ObReaderUtils {
stmt = conn.createStatement();
stmt.execute(setQueryTimeout);
stmt.execute(setTrxTimeout);
- LOG.warn("setAutoCommit=true;"+setQueryTimeout+";"+setTrxTimeout+";");
+ LOG.warn("setAutoCommit=true;" + setQueryTimeout + ";" + setTrxTimeout + ";");
} catch (Throwable e) {
LOG.warn("initConn4Reader fail", e);
} finally {
@@ -73,7 +92,6 @@ public class ObReaderUtils {
}
/**
- *
* @param conn
* @param context
*/
@@ -84,8 +102,11 @@ public class ObReaderUtils {
return;
}
List columns = context.getColumns();
+
// 最后参与排序的索引列
+
context.setPkColumns(pkColumns);
+
int[] pkIndexs = new int[pkColumns.length];
for (int i = 0, n = pkColumns.length; i < n; i++) {
String pkc = pkColumns[i];
@@ -112,9 +133,9 @@ public class ObReaderUtils {
String sql = "show index from " + tableName + " where Key_name='PRIMARY'";
if (isOracleMode(context.getCompatibleMode())) {
tableName = tableName.toUpperCase();
- sql = "SELECT cols.column_name Column_name "+
+ sql = "SELECT cols.column_name Column_name " +
"FROM all_constraints cons, all_cons_columns cols " +
- "WHERE cols.table_name = '" + tableName+ "' AND cons.constraint_type = 'P' " +
+ "WHERE cols.table_name = '" + tableName + "' AND cons.constraint_type = 'P' " +
"AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner";
}
LOG.info("get primary key by sql: " + sql);
@@ -126,11 +147,16 @@ public class ObReaderUtils {
ps = conn.createStatement();
rs = ps.executeQuery(sql);
while (rs.next()) {
- String columnName = StringUtils.lowerCase(rs.getString("Column_name"));
+ String columnName = rs.getString("Column_name");
+ columnName = escapeDatabaseKeywords(columnName);
+ if (!isEscapeMode(columnName)) {
+ columnName.toLowerCase();
+ }
if (!realIndex.contains(columnName)) {
realIndex.add(columnName);
}
}
+
String[] pks = new String[realIndex.size()];
realIndex.toArray(pks);
return pks;
@@ -156,7 +182,7 @@ public class ObReaderUtils {
if (StringUtils.isNotEmpty(indexName)) {
String weakReadHint = weakRead ? "+READ_CONSISTENCY(WEAK)," : "+";
sql += " /*" + weakReadHint + "index(" + context.getTable() + " " + indexName + ")*/ ";
- } else if (weakRead){
+ } else if (weakRead) {
sql += " /*+READ_CONSISTENCY(WEAK)*/ ";
}
sql += StringUtils.join(context.getColumns(), ',');
@@ -187,7 +213,6 @@ public class ObReaderUtils {
* 增量查的SQL
*
* @param conn
- *
* @param context
* @return sql
*/
@@ -197,8 +222,8 @@ public class ObReaderUtils {
String sql = "select ";
if (StringUtils.isNotEmpty(indexName)) {
String weakReadHint = weakRead ? "+READ_CONSISTENCY(WEAK)," : "+";
- sql += " /*"+ weakReadHint + "index(" + context.getTable() + " " + indexName + ")*/ ";
- } else if (weakRead){
+ sql += " /*" + weakReadHint + "index(" + context.getTable() + " " + indexName + ")*/ ";
+ } else if (weakRead) {
sql += " /*+READ_CONSISTENCY(WEAK)*/ ";
}
sql += StringUtils.join(context.getColumns(), ',') + " from " + context.getTable();
@@ -295,7 +320,7 @@ public class ObReaderUtils {
final char rightBracket = ')';
if (str != null && str.contains(String.valueOf(leftBracket)) && str.contains(String.valueOf(rightBracket)) &&
str.indexOf(leftBracket) < str.indexOf(rightBracket)) {
- return str.substring(str.indexOf(leftBracket)+1, str.indexOf(rightBracket));
+ return str.substring(str.indexOf(leftBracket) + 1, str.indexOf(rightBracket));
}
return str;
}
@@ -364,7 +389,7 @@ public class ObReaderUtils {
/**
* 找出where条件中的列名,目前仅支持全部为and条件,并且操作符为大于、大约等于、等于、小于、小于等于和不等于的表达式。
- *
+ *
* test coverage: - c6 = 20180710 OR c4 = 320: no index selected - 20180710
* = c6: correct index selected - 20180710 = c6 and c4 = 320 or c2 < 100: no
* index selected
@@ -416,17 +441,17 @@ public class ObReaderUtils {
if (isOracleMode(compatibleMode)) {
tableName = tableName.toUpperCase();
sql = "SELECT INDEX_NAME Key_name, COLUMN_NAME Column_name " +
- "from dba_ind_columns where TABLE_NAME = '" + tableName +"' " +
+ "from dba_ind_columns where TABLE_NAME = '" + tableName + "' " +
" union all " +
- "SELECT DISTINCT " +
- "CASE " +
- "WHEN cons.CONSTRAINT_TYPE = 'P' THEN 'PRIMARY' " +
- "WHEN cons.CONSTRAINT_TYPE = 'U' THEN cons.CONSTRAINT_NAME " +
- "ELSE '' " +
- "END AS Key_name, " +
- "cols.column_name Column_name " +
- "FROM all_constraints cons, all_cons_columns cols " +
- "WHERE cols.table_name = '" + tableName + "' AND cons.constraint_type in('P', 'U') " +
+ "SELECT DISTINCT " +
+ "CASE " +
+ "WHEN cons.CONSTRAINT_TYPE = 'P' THEN 'PRIMARY' " +
+ "WHEN cons.CONSTRAINT_TYPE = 'U' THEN cons.CONSTRAINT_NAME " +
+ "ELSE '' " +
+ "END AS Key_name, " +
+ "cols.column_name Column_name " +
+ "FROM all_constraints cons, all_cons_columns cols " +
+ "WHERE cols.table_name = '" + tableName + "' AND cons.constraint_type in('P', 'U') " +
"AND cons.constraint_name = cols.constraint_name AND cons.owner = cols.owner";
}
Statement stmt = null;
@@ -469,14 +494,13 @@ public class ObReaderUtils {
}
/**
- *
* @param conn
* @param table
* @param colNamesInCondition
* @return
*/
private static List getIndexName(Connection conn, String table,
- Set colNamesInCondition, String compatibleMode) {
+ Set colNamesInCondition, String compatibleMode) {
List indexNames = new ArrayList();
if (colNamesInCondition == null || colNamesInCondition.size() == 0) {
LOG.info("there is no qulified conditions in the where clause, skip index selection.");
@@ -540,7 +564,7 @@ public class ObReaderUtils {
Map index = new TreeMap();
List columnList = allIndexInTab.get(indexName);
for (int i = 1; i <= columnList.size(); i++) {
- index.put(i, columnList.get(i-1));
+ index.put(i, columnList.get(i - 1));
}
allIndexs.put(indexName, index);
} else {
@@ -644,19 +668,19 @@ public class ObReaderUtils {
public static void binding(PreparedStatement ps, List list) throws SQLException {
for (int i = 0, n = list.size(); i < n; i++) {
Column c = list.get(i);
- if(c instanceof BoolColumn){
- ps.setLong(i + 1, ((BoolColumn)c).asLong());
- }else if(c instanceof BytesColumn){
- ps.setBytes(i + 1, ((BytesColumn)c).asBytes());
- }else if(c instanceof DateColumn){
- ps.setTimestamp(i + 1, new Timestamp(((DateColumn)c).asDate().getTime()));
- }else if(c instanceof DoubleColumn){
- ps.setDouble(i + 1, ((DoubleColumn)c).asDouble());
- }else if(c instanceof LongColumn){
- ps.setLong(i + 1, ((LongColumn)c).asLong());
- }else if(c instanceof StringColumn){
- ps.setString(i + 1, ((StringColumn)c).asString());
- }else{
+ if (c instanceof BoolColumn) {
+ ps.setLong(i + 1, ((BoolColumn) c).asLong());
+ } else if (c instanceof BytesColumn) {
+ ps.setBytes(i + 1, ((BytesColumn) c).asBytes());
+ } else if (c instanceof DateColumn) {
+ ps.setTimestamp(i + 1, new Timestamp(((DateColumn) c).asDate().getTime()));
+ } else if (c instanceof DoubleColumn) {
+ ps.setDouble(i + 1, ((DoubleColumn) c).asDouble());
+ } else if (c instanceof LongColumn) {
+ ps.setLong(i + 1, ((LongColumn) c).asLong());
+ } else if (c instanceof StringColumn) {
+ ps.setString(i + 1, ((StringColumn) c).asString());
+ } else {
ps.setObject(i + 1, c.getRawData());
}
}
@@ -692,6 +716,6 @@ public class ObReaderUtils {
}
public static boolean isOracleMode(String mode) {
- return (mode != null && OB_COMPATIBLE_MODE_ORACLE.equals(mode));
+ return (mode != null && OB_COMPATIBLE_MODE_ORACLE.equalsIgnoreCase(mode));
}
}
diff --git a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java
index ba754a37..17655a52 100644
--- a/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java
+++ b/oceanbasev10reader/src/main/java/com/alibaba/datax/plugin/reader/oceanbasev10reader/util/TaskContext.java
@@ -162,6 +162,7 @@ public class TaskContext {
public String getUserSavePoint() {
return userSavePoint;
}
+
public void setUserSavePoint(String userSavePoint) {
this.userSavePoint = userSavePoint;
}
diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java
index 4ffaffed..ede2eb01 100644
--- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java
+++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/OceanBaseV10Writer.java
@@ -1,15 +1,5 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer;
-import java.sql.*;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.TimeUnit;
-
-import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils;
-import org.apache.commons.lang3.StringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
@@ -20,7 +10,16 @@ import com.alibaba.datax.plugin.rdbms.writer.Constant;
import com.alibaba.datax.plugin.rdbms.writer.Key;
import com.alibaba.datax.plugin.rdbms.writer.util.WriterUtil;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.task.ConcurrentTableWriterTask;
+import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.DbUtils;
import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
+import com.alibaba.fastjson.JSONObject;
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.sql.Connection;
+import java.util.ArrayList;
+import java.util.List;
/**
* 2016-04-07
@@ -60,6 +59,19 @@ public class OceanBaseV10Writer extends Writer {
public void init() {
this.originalConfig = super.getPluginJobConf();
checkCompatibleMode(originalConfig);
+ //将config中的column和table中的关键字进行转义
+ List columns = originalConfig.getList(Key.COLUMN, String.class);
+ ObWriterUtils.escapeDatabaseKeywords(columns);
+ originalConfig.set(Key.COLUMN, columns);
+
+ List conns = originalConfig.getList(Constant.CONN_MARK, JSONObject.class);
+ for (int i = 0; i < conns.size(); i++) {
+ JSONObject conn = conns.get(i);
+ Configuration connConfig = Configuration.from(conn.toString());
+ List tables = connConfig.getList(Key.TABLE, String.class);
+ ObWriterUtils.escapeDatabaseKeywords(tables);
+ originalConfig.set(String.format("%s[%d].%s", Constant.CONN_MARK, i, Key.TABLE), tables);
+ }
this.commonJob = new CommonRdbmsWriter.Job(DATABASE_TYPE);
this.commonJob.init(this.originalConfig);
}
@@ -222,6 +234,7 @@ public class OceanBaseV10Writer extends Writer {
/**
* 注意:此方法每个 Task 都会执行一次。 最佳实践:此处适当封装确保简洁清晰完成数据写入工作。
*/
+ @Override
public void startWrite(RecordReceiver recordReceiver) {
this.writerTask.startWrite(recordReceiver, this.writerSliceConfig, super.getTaskPluginCollector());
}
diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java
index cbc9a936..e6b4a561 100644
--- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java
+++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/task/ConcurrentTableWriterTask.java
@@ -1,7 +1,27 @@
package com.alibaba.datax.plugin.writer.oceanbasev10writer.task;
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.plugin.RecordReceiver;
+import com.alibaba.datax.common.plugin.TaskPluginCollector;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.datax.plugin.rdbms.util.DBUtil;
+import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
+import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
+import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
+import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
+import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder;
+import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder;
+import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo;
+import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
+import com.alipay.oceanbase.obproxy.data.TableEntryKey;
+import com.alipay.oceanbase.obproxy.util.ObPartitionIdCalculator;
+import org.apache.commons.lang3.tuple.Pair;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
import java.sql.Connection;
-//import java.sql.PreparedStatement;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.ArrayList;
@@ -16,27 +36,7 @@ import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
-import com.alibaba.datax.common.element.Column;
-import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ObClientConnHolder;
-import org.apache.commons.lang3.tuple.Pair;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.alibaba.datax.common.element.Record;
-import com.alibaba.datax.common.exception.DataXException;
-import com.alibaba.datax.common.plugin.RecordReceiver;
-import com.alibaba.datax.common.plugin.TaskPluginCollector;
-import com.alibaba.datax.common.util.Configuration;
-import com.alibaba.datax.plugin.rdbms.util.DBUtil;
-import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
-import com.alibaba.datax.plugin.rdbms.util.DataBaseType;
-import com.alibaba.datax.plugin.rdbms.writer.CommonRdbmsWriter;
-import com.alibaba.datax.plugin.writer.oceanbasev10writer.Config;
-import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ConnHolder;
-import com.alibaba.datax.plugin.writer.oceanbasev10writer.ext.ServerConnectInfo;
-import com.alibaba.datax.plugin.writer.oceanbasev10writer.util.ObWriterUtils;
-import com.alipay.oceanbase.obproxy.data.TableEntryKey;
-import com.alipay.oceanbase.obproxy.util.ObPartitionIdCalculator;
+//import java.sql.PreparedStatement;
public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
private static final Logger LOG = LoggerFactory.getLogger(ConcurrentTableWriterTask.class);
@@ -62,6 +62,7 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
private ObPartitionIdCalculator partCalculator = null;
private HashMap> groupInsertValues;
+ List unknownPartRecords = new ArrayList();
// private List unknownPartRecords;
private List partitionKeyIndexes;
@@ -104,10 +105,14 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
connectInfo.getFullUserName(), connectInfo.password);
checkConnHolder.initConnection();
if (isOracleCompatibleMode) {
- connectInfo.databaseName = connectInfo.databaseName.toUpperCase();
- table = table.toUpperCase();
- LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s",
- connectInfo.databaseName, table));
+ connectInfo.databaseName = connectInfo.databaseName.toUpperCase();
+ //在转义的情况下不翻译
+ if (!(table.startsWith("\"") && table.endsWith("\""))) {
+ table = table.toUpperCase();
+ }
+
+ LOG.info(String.format("this is oracle compatible mode, change database to %s, table to %s",
+ connectInfo.databaseName, table));
}
if (config.getBool(Config.USE_PART_CALCULATOR, Config.DEFAULT_USE_PART_CALCULATOR)) {
@@ -289,20 +294,15 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
}
private void addLeftRecords() {
+ //不需要刷新Cache,已经是最后一批数据了
for (List groupValues : groupInsertValues.values()) {
if (groupValues.size() > 0 ) {
- int retry = 0;
- while (true) {
- try {
- concurrentWriter.addBatchRecords(groupValues);
- break;
- } catch (InterruptedException e) {
- retry++;
- LOG.info("Concurrent table writer is interrupted, retry {}", retry);
- }
- }
+ addRecordsToWriteQueue(groupValues);
}
}
+ if (unknownPartRecords.size() > 0) {
+ addRecordsToWriteQueue(unknownPartRecords);
+ }
}
private void addRecordToCache(final Record record) {
@@ -326,41 +326,40 @@ public class ConcurrentTableWriterTask extends CommonRdbmsWriter.Task {
}
groupValues.add(record);
if (groupValues.size() >= batchSize) {
- int i = 0;
- while (true) {
- if (i > 0) {
- LOG.info("retry add batch record the {} times", i);
- }
- try {
- concurrentWriter.addBatchRecords(groupValues);
- printEveryTime();
- break;
- } catch (InterruptedException e) {
- LOG.info("Concurrent table writer is interrupted");
- }
- }
- groupValues = new ArrayList(batchSize);
+ groupValues = addRecordsToWriteQueue(groupValues);
groupInsertValues.put(partId, groupValues);
}
} else {
- LOG.warn("add unknown part record {}", record);
- List unknownPartRecords = new ArrayList();
+ LOG.debug("add unknown part record {}", record);
unknownPartRecords.add(record);
- int i = 0;
- while (true) {
- if (i > 0) {
- LOG.info("retry add batch record the {} times", i);
- }
- try {
- concurrentWriter.addBatchRecords(unknownPartRecords);
- break;
- } catch (InterruptedException e) {
- LOG.info("Concurrent table writer is interrupted");
- }
+ if (unknownPartRecords.size() >= batchSize) {
+ unknownPartRecords = addRecordsToWriteQueue(unknownPartRecords);
}
+
}
}
+ /**
+ *
+ * @param records
+ * @return 返回一个新的Cache用于存储接下来的数据
+ */
+ private List addRecordsToWriteQueue(List records) {
+ int i = 0;
+ while (true) {
+ if (i > 0) {
+ LOG.info("retry add batch record the {} times", i);
+ }
+ try {
+ concurrentWriter.addBatchRecords(records);
+ break;
+ } catch (InterruptedException e) {
+ i++;
+ LOG.info("Concurrent table writer is interrupted");
+ }
+ }
+ return new ArrayList(batchSize);
+ }
private void checkMemStore() {
Connection checkConn = checkConnHolder.reconnect();
long now = System.currentTimeMillis();
diff --git a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java
index 368c3d17..ff1648a1 100644
--- a/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java
+++ b/oceanbasev10writer/src/main/java/com/alibaba/datax/plugin/writer/oceanbasev10writer/util/ObWriterUtils.java
@@ -11,15 +11,47 @@ import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.*;
-import java.util.stream.Collectors;
public class ObWriterUtils {
- protected static final Logger LOG = LoggerFactory.getLogger(Task.class);
+
+ private static final String MYSQL_KEYWORDS = "ACCESSIBLE,ACCOUNT,ACTION,ADD,AFTER,AGAINST,AGGREGATE,ALGORITHM,ALL,ALTER,ALWAYS,ANALYSE,AND,ANY,AS,ASC,ASCII,ASENSITIVE,AT,AUTO_INCREMENT,AUTOEXTEND_SIZE,AVG,AVG_ROW_LENGTH,BACKUP,BEFORE,BEGIN,BETWEEN,BIGINT,BINARY,BINLOG,BIT,BLOB,BLOCK,BOOL,BOOLEAN,BOTH,BTREE,BY,BYTE,CACHE,CALL,CASCADE,CASCADED,CASE,CATALOG_NAME,CHAIN,CHANGE,CHANGED,CHANNEL,CHAR,CHARACTER,CHARSET,CHECK,CHECKSUM,CIPHER,CLASS_ORIGIN,CLIENT,CLOSE,COALESCE,CODE,COLLATE,COLLATION,COLUMN,COLUMN_FORMAT,COLUMN_NAME,COLUMNS,COMMENT,COMMIT,COMMITTED,COMPACT,COMPLETION,COMPRESSED,COMPRESSION,CONCURRENT,CONDITION,CONNECTION,CONSISTENT,CONSTRAINT,CONSTRAINT_CATALOG,CONSTRAINT_NAME,CONSTRAINT_SCHEMA,CONTAINS,CONTEXT,CONTINUE,CONVERT,CPU,CREATE,CROSS,CUBE,CURRENT,CURRENT_DATE,CURRENT_TIME,CURRENT_TIMESTAMP,CURRENT_USER,CURSOR,CURSOR_NAME,DATA,DATABASE,DATABASES,DATAFILE,DATE,DATETIME,DAY,DAY_HOUR,DAY_MICROSECOND,DAY_MINUTE,DAY_SECOND,DEALLOCATE,DEC,DECIMAL,DECLARE,DEFAULT,DEFAULT_AUTH,DEFINER,DELAY_KEY_WRITE,DELAYED,DELETE,DES_KEY_FILE,DESC,DESCRIBE,DETERMINISTIC,DIAGNOSTICS,DIRECTORY,DISABLE,DISCARD,DISK,DISTINCT,DISTINCTROW,DIV,DO,DOUBLE,DROP,DUAL,DUMPFILE,DUPLICATE,DYNAMIC,EACH,ELSE,ELSEIF,ENABLE,ENCLOSED,ENCRYPTION,END,ENDS,ENGINE,ENGINES,ENUM,ERROR,ERRORS,ESCAPE,ESCAPED,EVENT,EVENTS,EVERY,EXCHANGE,EXECUTE,EXISTS,EXIT,EXPANSION,EXPIRE,EXPLAIN,EXPORT,EXTENDED,EXTENT_SIZE,FAST,FAULTS,FETCH,FIELDS,FILE,FILE_BLOCK_SIZE,FILTER,FIRST,FIXED,FLOAT,FLOAT4,FLOAT8,FLUSH,FOLLOWS,FOR,FORCE,FOREIGN,FORMAT,FOUND,FROM,FULL,FULLTEXT,FUNCTION,GENERAL,GENERATED,GEOMETRY,GEOMETRYCOLLECTION,GET,GET_FORMAT,GLOBAL,GRANT,GRANTS,GROUP,GROUP_REPLICATION,HANDLER,HASH,HAVING,HELP,HIGH_PRIORITY,HOST,HOSTS,HOUR,HOUR_MICROSECOND,HOUR_MINUTE,HOUR_SECOND,IDENTIFIED,IF,IGNORE,IGNORE_SERVER_IDS,IMPORT,IN,INDEX,INDEXES,INFILE,INITIAL_SIZE,INNER,INOUT,INSENSITIVE,INSERT,INSERT_METHOD,INSTALL,INSTANCE,INT,INT1,INT2,INT3,INT4,INT8,INTEGER,INTERVAL,INTO,INVOKER,IO,IO_AFTER_GTIDS,IO_BEFORE_GTIDS,IO_THREAD,IPC,IS,ISOLATION,ISSUER,ITERATE,JOIN,JSON,KEY,KEY_BLOCK_SIZE,KEYS,KILL,LANGUAGE,LAST,LEADING,LEAVE,LEAVES,LEFT,LESS,LEVEL,LIKE,LIMIT,LINEAR,LINES,LINESTRING,LIST,LOAD,LOCAL,LOCALTIME,LOCALTIMESTAMP,LOCK,LOCKS,LOGFILE,LOGS,LONG,LONGBLOB,LONGTEXT,LOOP,LOW_PRIORITY,MASTER,MASTER_AUTO_POSITION,MASTER_BIND,MASTER_CONNECT_RETRY,MASTER_DELAY,MASTER_HEARTBEAT_PERIOD,MASTER_HOST,MASTER_LOG_FILE,MASTER_LOG_POS,MASTER_PASSWORD,MASTER_PORT,MASTER_RETRY_COUNT,MASTER_SERVER_ID,MASTER_SSL,MASTER_SSL_CA,MASTER_SSL_CAPATH,MASTER_SSL_CERT,MASTER_SSL_CIPHER,MASTER_SSL_CRL,MASTER_SSL_CRLPATH,MASTER_SSL_KEY,MASTER_SSL_VERIFY_SERVER_CERT,MASTER_TLS_VERSION,MASTER_USER,MATCH,MAX_CONNECTIONS_PER_HOUR,MAX_QUERIES_PER_HOUR,MAX_ROWS,MAX_SIZE,MAX_STATEMENT_TIME,MAX_UPDATES_PER_HOUR,MAX_USER_CONNECTIONS,MAXVALUE,MEDIUM,MEDIUMBLOB,MEDIUMINT,MEDIUMTEXT,MEMORY,MERGE,MESSAGE_TEXT,MICROSECOND,MIDDLEINT,MIGRATE,MIN_ROWS,MINUTE,MINUTE_MICROSECOND,MINUTE_SECOND,MOD,MODE,MODIFIES,MODIFY,MONTH,MULTILINESTRING,MULTIPOINT,MULTIPOLYGON,MUTEX,MYSQL_ERRNO,NAME,NAMES,NATIONAL,NATURAL,NCHAR,NDB,NDBCLUSTER,NEVER,NEW,NEXT,NO,NO_WAIT,NO_WRITE_TO_BINLOG,NODEGROUP,NONBLOCKING,NONE,NOT,NULL,NUMBER,NUMERIC,NVARCHAR,OFFSET,OLD_PASSWORD,ON,ONE,ONLY,OPEN,OPTIMIZE,OPTIMIZER_COSTS,OPTION,OPTIONALLY,OPTIONS,OR,ORDER,OUT,OUTER,OUTFILE,OWNER,PACK_KEYS,PAGE,PARSE_GCOL_EXPR,PARSER,PARTIAL,PARTITION,PARTITIONING,PARTITIONS,PASSWORD,PHASE,PLUGIN,PLUGIN_DIR,PLUGINS,POINT,POLYGON,PORT,PRECEDES,PRECISION,PREPARE,PRESERVE,PREV,PRIMARY,PRIVILEGES,PROCEDURE,PROCESSLIST,PROFILE,PROFILES,PROXY,PURGE,QUARTER,QUERY,QUICK,RANGE,READ,READ_ONLY,READ_WRITE,READS,REAL,REBUILD,RECOVER,REDO_BUFFER_SIZE,REDOFILE,REDUNDANT,REFERENCES,REGEXP,RELAY,RELAY_LOG_FILE,RELAY_LOG_POS,RELAY_THREAD,RELAYLOG,RELEASE,RELOAD,REMOVE,RENAME,REORGANIZE,REPAIR,REPEAT,REPEATABLE,REPLACE,REPLICATE_DO_DB,REPLICATE_DO_TABLE,REPLICATE_IGNORE_DB,REPLICATE_IGNORE_TABLE,REPLICATE_REWRITE_DB,REPLICATE_WILD_DO_TABLE,REPLICATE_WILD_IGNORE_TABLE,REPLICATION,REQUIRE,RESET,RESIGNAL,RESTORE,RESTRICT,RESUME,RETURN,RETURNED_SQLSTATE,RETURNS,REVERSE,REVOKE,RIGHT,RLIKE,ROLLBACK,ROLLUP,ROTATE,ROUTINE,ROW,ROW_COUNT,ROW_FORMAT,ROWS,RTREE,SAVEPOINT,SCHEDULE,SCHEMA,SCHEMA_NAME,SCHEMAS,SECOND,SECOND_MICROSECOND,SECURITY,SELECT,SENSITIVE,SEPARATOR,SERIAL,SERIALIZABLE,SERVER,SESSION,SET,SHARE,SHOW,SHUTDOWN,SIGNAL,SIGNED,SIMPLE,SLAVE,SLOW,SMALLINT,SNAPSHOT,SOCKET,SOME,SONAME,SOUNDS,SOURCE,SPATIAL,SPECIFIC,SQL,SQL_AFTER_GTIDS,SQL_AFTER_MTS_GAPS,SQL_BEFORE_GTIDS,SQL_BIG_RESULT,SQL_BUFFER_RESULT,SQL_CACHE,SQL_CALC_FOUND_ROWS,SQL_NO_CACHE,SQL_SMALL_RESULT,SQL_THREAD,SQL_TSI_DAY,SQL_TSI_HOUR,SQL_TSI_MINUTE,SQL_TSI_MONTH,SQL_TSI_QUARTER,SQL_TSI_SECOND,SQL_TSI_WEEK,SQL_TSI_YEAR,SQLEXCEPTION,SQLSTATE,SQLWARNING,SSL,STACKED,START,STARTING,STARTS,STATS_AUTO_RECALC,STATS_PERSISTENT,STATS_SAMPLE_PAGES,STATUS,STOP,STORAGE,STORED,STRAIGHT_JOIN,STRING,SUBCLASS_ORIGIN,SUBJECT,SUBPARTITION,SUBPARTITIONS,SUPER,SUSPEND,SWAPS,SWITCHES,TABLE,TABLE_CHECKSUM,TABLE_NAME,TABLES,TABLESPACE,TEMPORARY,TEMPTABLE,TERMINATED,TEXT,THAN,THEN,TIME,TIMESTAMP,TIMESTAMPADD,TIMESTAMPDIFF,TINYBLOB,TINYINT,TINYTEXT,TO,TRAILING,TRANSACTION,TRIGGER,TRIGGERS,TRUNCATE,TYPE,TYPES,UNCOMMITTED,UNDEFINED,UNDO,UNDO_BUFFER_SIZE,UNDOFILE,UNICODE,UNINSTALL,UNION,UNIQUE,UNKNOWN,UNLOCK,UNSIGNED,UNTIL,UPDATE,UPGRADE,USAGE,USE,USE_FRM,USER,USER_RESOURCES,USING,UTC_DATE,UTC_TIME,UTC_TIMESTAMP,VALIDATION,VALUE,VALUES,VARBINARY,VARCHAR,VARCHARACTER,VARIABLES,VARYING,VIEW,VIRTUAL,WAIT,WARNINGS,WEEK,WEIGHT_STRING,WHEN,WHERE,WHILE,WITH,WITHOUT,WORK,WRAPPER,WRITE,X509,XA,XID,XML,XOR,YEAR,YEAR_MONTH,ZEROFILL,FALSE,TRUE";
+ private static final String ORACLE_KEYWORDS = "ACCESS,ADD,ALL,ALTER,AND,ANY,ARRAYLEN,AS,ASC,AUDIT,BETWEEN,BY,CHAR,CHECK,CLUSTER,COLUMN,COMMENT,COMPRESS,CONNECT,CREATE,CURRENT,DATE,DECIMAL,DEFAULT,DELETE,DESC,DISTINCT,DROP,ELSE,EXCLUSIVE,EXISTS,FILE,FLOAT,FOR,FROM,GRANT,GROUP,HAVING,IDENTIFIED,IMMEDIATE,IN,INCREMENT,INDEX,INITIAL,INSERT,INTEGER,INTERSECT,INTO,IS,LEVEL,LIKE,LOCK,LONG,MAXEXTENTS,MINUS,MODE,MODIFY,NOAUDIT,NOCOMPRESS,NOT,NOTFOUND,NOWAIT,NULL,NUMBER,OF,OFFLINE,ON,ONLINE,OPTION,OR,ORDER,PCTFREE,PRIOR,PRIVILEGES,PUBLIC,RAW,RENAME,RESOURCE,REVOKE,ROW,ROWID,ROWLABEL,ROWNUM,ROWS,SELECT,SESSION,SET,SHARE,SIZE,SMALLINT,SQLBUF,START,SUCCESSFUL,SYNONYM,TABLE,THEN,TO,TRIGGER,UID,UNION,UNIQUE,UPDATE,USER,VALIDATE,VALUES,VARCHAR,VARCHAR2,VIEW,WHENEVER,WHERE,WITH";
private static String CHECK_MEMSTORE = "select 1 from %s.gv$memstore t where t.total>t.mem_limit * ?";
-
+ private static Set databaseKeywords;
private static String compatibleMode = null;
+ protected static final Logger LOG = LoggerFactory.getLogger(Task.class);
+ private static Set keywordsFromString2HashSet(final String keywords) {
+ return new HashSet(Arrays.asList(keywords.split(",")));
+ }
+ public static String escapeDatabaseKeywords(String keyword) {
+ if (databaseKeywords == null) {
+ if (isOracleMode()) {
+ databaseKeywords = keywordsFromString2HashSet(ORACLE_KEYWORDS);
+ } else {
+ databaseKeywords = keywordsFromString2HashSet(MYSQL_KEYWORDS);
+ }
+ }
+ char escapeChar = isOracleMode() ? '"' : '`';
+ if (databaseKeywords.contains(keyword.toUpperCase())) {
+ keyword = escapeChar + keyword + escapeChar;
+ }
+ return keyword;
+ }
+
+ public static void escapeDatabaseKeywords(List keywords) {
+ for (int i = 0; i < keywords.size(); i++) {
+ keywords.set(i, escapeDatabaseKeywords(keywords.get(i)));
+ }
+ }
+ public static Boolean isEscapeMode(String keyword){
+ if(isOracleMode()){
+ return keyword.startsWith("\"") && keyword.endsWith("\"");
+ }else{
+ return keyword.startsWith("`") && keyword.endsWith("`");
+ }
+ }
public static boolean isMemstoreFull(Connection conn, double memstoreThreshold) {
PreparedStatement ps = null;
ResultSet rs = null;
@@ -70,7 +102,11 @@ public class ObWriterUtils {
}
private static int[] getColumnIndex(List columnsInIndex, List allColumns) {
- allColumns = allColumns.stream().map(String::toUpperCase).collect(Collectors.toList());
+ for (int i = 0; i < allColumns.size(); i++) {
+ if (!ObWriterUtils.isEscapeMode(allColumns.get(i))) {
+ allColumns.set(i, allColumns.get(i).toUpperCase());
+ }
+ }
int[] colIdx = new int[columnsInIndex.size()];
for (int i = 0; i < columnsInIndex.size(); i++) {
int index = allColumns.indexOf(columnsInIndex.get(i));
@@ -122,7 +158,11 @@ public class ObWriterUtils {
rs = stmt.executeQuery(sql);
while (rs.next()) {
String keyName = rs.getString("Key_name");
- String columnName = StringUtils.upperCase(rs.getString("Column_name"));
+ String columnName = rs.getString("Column_name");
+ columnName=escapeDatabaseKeywords(columnName);
+ if(!ObWriterUtils.isEscapeMode(columnName)){
+ columnName = columnName.toUpperCase();
+ }
List s = uniqueKeys.get(keyName);
if (s == null) {
s = new ArrayList();
@@ -135,6 +175,7 @@ public class ObWriterUtils {
} finally {
asyncClose(rs, stmt, null);
}
+ //ObWriterUtils.escapeDatabaseKeywords(uniqueKeys);
return uniqueKeys;
}
@@ -291,6 +332,7 @@ public class ObWriterUtils {
* @param e
* @return
*/
+
public static boolean isFatalError(SQLException e) {
String sqlState = e.getSQLState();
if (StringUtils.startsWith(sqlState, "08")) {
diff --git a/oscarwriter/pom.xml b/oscarwriter/pom.xml
index 51643c76..06249a26 100644
--- a/oscarwriter/pom.xml
+++ b/oscarwriter/pom.xml
@@ -39,12 +39,18 @@
plugin-rdbms-util
${datax-project-version}
-
+
+
+
+ com.csicit.thirdparty
+ oscar
+ 1.0.1
diff --git a/otsreader/pom.xml b/otsreader/pom.xml
index bd017423..b1e0e735 100644
--- a/otsreader/pom.xml
+++ b/otsreader/pom.xml
@@ -34,6 +34,16 @@
com.aliyun.openservices
ots-public
2.2.4
+
+
+ log4j-api
+ org.apache.logging.log4j
+
+
+ log4j-core
+ org.apache.logging.log4j
+
+
com.google.code.gson
diff --git a/otsstreamreader/pom.xml b/otsstreamreader/pom.xml
index 2a12872f..84ca2d6a 100644
--- a/otsstreamreader/pom.xml
+++ b/otsstreamreader/pom.xml
@@ -33,6 +33,16 @@
com.aliyun.openservices
tablestore-streamclient
1.0.0
+
+
+ log4j-api
+ org.apache.logging.log4j
+
+
+ log4j-core
+ org.apache.logging.log4j
+
+
com.google.code.gson
diff --git a/otswriter/pom.xml b/otswriter/pom.xml
index 8677c8ab..d40f68b3 100644
--- a/otswriter/pom.xml
+++ b/otswriter/pom.xml
@@ -34,6 +34,16 @@
com.aliyun.openservices
ots-public
2.2.4
+
+
+ log4j-api
+ org.apache.logging.log4j
+
+
+ log4j-core
+ org.apache.logging.log4j
+
+
com.google.code.gson
diff --git a/package.xml b/package.xml
index 3f024f9d..fd9c8b52 100755
--- a/package.xml
+++ b/package.xml
@@ -371,6 +371,13 @@
datax
+
+ tsdbreader/target/datax/
+
+ **/*.*
+
+ datax
+
adbpgwriter/target/datax/
diff --git a/pom.xml b/pom.xml
index 72241137..5b00bb3a 100644
--- a/pom.xml
+++ b/pom.xml
@@ -184,6 +184,17 @@
1.9.5
test
+
+ org.apache.logging.log4j
+ log4j-api
+ 2.17.1
+
+
+
+ org.apache.logging.log4j
+ log4j-core
+ 2.17.1
+
diff --git a/postgresqlwriter/src/main/java/com/alibaba/datax/plugin/writer/postgresqlwriter/PostgresqlWriter.java b/postgresqlwriter/src/main/java/com/alibaba/datax/plugin/writer/postgresqlwriter/PostgresqlWriter.java
index 22dc0c1e..2d38db35 100755
--- a/postgresqlwriter/src/main/java/com/alibaba/datax/plugin/writer/postgresqlwriter/PostgresqlWriter.java
+++ b/postgresqlwriter/src/main/java/com/alibaba/datax/plugin/writer/postgresqlwriter/PostgresqlWriter.java
@@ -67,6 +67,8 @@ public class PostgresqlWriter extends Writer {
public String calcValueHolder(String columnType){
if("serial".equalsIgnoreCase(columnType)){
return "?::int";
+ }else if("bigserial".equalsIgnoreCase(columnType)){
+ return "?::int8";
}else if("bit".equalsIgnoreCase(columnType)){
return "?::bit varying";
}
diff --git a/rdbmsreader/doc/rdbmsreader.md b/rdbmsreader/doc/rdbmsreader.md
index dd3039e9..fd8ae103 100644
--- a/rdbmsreader/doc/rdbmsreader.md
+++ b/rdbmsreader/doc/rdbmsreader.md
@@ -138,7 +138,7 @@ RDBMSReader插件实现了从RDBMS读取数据。在底层实现上,RDBMSReade
**rdbmswriter如何增加新的数据库支持:**
- - 进入rdbmsreader对应目录,这里${DATAX_HOME}为DataX主目录,即: ${DATAX_HOME}/plugin/reader/rdbmswriter
+ - 进入rdbmsreader对应目录,这里${DATAX_HOME}为DataX主目录,即: ${DATAX_HOME}/plugin/reader/rdbmsreader
- 在rdbmsreader插件目录下有plugin.json配置文件,在此文件中注册您具体的数据库驱动,具体放在drivers数组中。rdbmsreader插件在任务执行时会动态选择合适的数据库驱动连接数据库。
diff --git a/tdenginewriter/doc/tdenginewriter-CN.md b/tdenginewriter/doc/tdenginewriter-CN.md
index a868f8f2..ac5a555a 100644
--- a/tdenginewriter/doc/tdenginewriter-CN.md
+++ b/tdenginewriter/doc/tdenginewriter-CN.md
@@ -232,14 +232,14 @@ TAGS(
#### 3.2.6 类型转换
-| MongoDB 数据类型 | DataX 内部类型 | TDengine 数据类型 |
-| ---------------- | -------------- | ----------------- |
-| int, Long | Long | BIGINT |
-| double | Double | DOUBLE |
-| string, array | String | NCHAR(64) |
-| date | Date | TIMESTAMP |
-| boolean | Boolean | BOOL |
-| bytes | Bytes | BINARY(64) |
+| DataX 内部类型 | TDengine 数据类型 |
+|-------------- | ----------------- |
+|Long | BIGINT |
+|Double | DOUBLE |
+|String | NCHAR(64) |
+|Date | TIMESTAMP |
+|Boolean | BOOL |
+|Bytes | BINARY(64) |
### 3.3 从关系型数据库到TDengine
writer部分的配置规则和上述MongoDB的示例是一样的,这里给出一个MySQL的示例。
diff --git a/tdenginewriter/doc/tdenginewriter.md b/tdenginewriter/doc/tdenginewriter.md
index 6f19c112..fb8cf642 100644
--- a/tdenginewriter/doc/tdenginewriter.md
+++ b/tdenginewriter/doc/tdenginewriter.md
@@ -228,14 +228,14 @@ Then the first columns received by this writer plugin must represent timestamp,
#### 3.2.6 Type Convert
-| MongoDB Type | DataX Type | TDengine Type |
-| ---------------- | -------------- | ----------------- |
-| int, Long | Long | BIGINT |
-| double | Double | DOUBLE |
-| string, array | String | NCHAR(64) |
-| date | Date | TIMESTAMP |
-| boolean | Boolean | BOOL |
-| bytes | Bytes | BINARY(64) |
+|DataX Type | TDengine Type |
+|-------------- | ----------------- |
+|Long | BIGINT |
+|Double | DOUBLE |
+|String | NCHAR(64) |
+|Date | TIMESTAMP |
+|Boolean | BOOL |
+|Bytes | BINARY(64) |
### 3.3 From Relational Database to TDengine
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java
index e42dedc0..f5069dc9 100644
--- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Constant.java
@@ -16,6 +16,8 @@ public final class Constant {
static final String DEFAULT_DATA_FORMAT = "yyyy-MM-dd HH:mm:ss";
public static final String METRIC_SPECIFY_KEY = "__metric__";
+ public static final String METRIC_SPECIFY_KEY_PREFIX = METRIC_SPECIFY_KEY + ".";
+ public static final int METRIC_SPECIFY_KEY_PREFIX_LENGTH = METRIC_SPECIFY_KEY_PREFIX.length();
public static final String TS_SPECIFY_KEY = "__ts__";
public static final String VALUE_SPECIFY_KEY = "__value__";
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java
index 14ee7e41..c8a3d7ae 100644
--- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/Key.java
@@ -17,14 +17,19 @@ public class Key {
// RDB for MySQL / ADB etc.
static final String SINK_DB_TYPE = "sinkDbType";
static final String ENDPOINT = "endpoint";
+ static final String USERNAME = "username";
+ static final String PASSWORD = "password";
static final String COLUMN = "column";
static final String METRIC = "metric";
static final String FIELD = "field";
static final String TAG = "tag";
+ static final String COMBINE = "combine";
static final String INTERVAL_DATE_TIME = "splitIntervalMs";
static final String BEGIN_DATE_TIME = "beginDateTime";
static final String END_DATE_TIME = "endDateTime";
+ static final String HINT = "hint";
+ static final Boolean COMBINE_DEFAULT_VALUE = false;
static final Integer INTERVAL_DATE_TIME_DEFAULT_VALUE = 60;
static final String TYPE_DEFAULT_VALUE = "TSDB";
static final Set TYPE_SET = new HashSet<>();
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java
index 04b931c7..550a010a 100755
--- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/TSDBReader.java
@@ -60,6 +60,15 @@ public class TSDBReader extends Reader {
"The parameter [" + Key.ENDPOINT + "] is not set.");
}
+ String username = originalConfig.getString(Key.USERNAME, null);
+ if (StringUtils.isBlank(username)) {
+ LOG.warn("The parameter [" + Key.USERNAME + "] is blank.");
+ }
+ String password = originalConfig.getString(Key.PASSWORD, null);
+ if (StringUtils.isBlank(password)) {
+ LOG.warn("The parameter [" + Key.PASSWORD + "] is blank.");
+ }
+
// tagK / field could be empty
if ("TSDB".equals(type)) {
List columns = originalConfig.getList(Key.COLUMN, String.class);
@@ -76,7 +85,14 @@ public class TSDBReader extends Reader {
"The parameter [" + Key.COLUMN + "] is not set.");
}
for (String specifyKey : Constant.MUST_CONTAINED_SPECIFY_KEYS) {
- if (!columns.contains(specifyKey)) {
+ boolean containSpecifyKey = false;
+ for (String column : columns) {
+ if (column.startsWith(specifyKey)) {
+ containSpecifyKey = true;
+ break;
+ }
+ }
+ if (!containSpecifyKey) {
throw DataXException.asDataXException(
TSDBReaderErrorCode.ILLEGAL_VALUE,
"The parameter [" + Key.COLUMN + "] should contain "
@@ -99,6 +115,8 @@ public class TSDBReader extends Reader {
"The parameter [" + Key.INTERVAL_DATE_TIME + "] should be great than zero.");
}
+ Boolean isCombine = originalConfig.getBool(Key.COMBINE, Key.COMBINE_DEFAULT_VALUE);
+
SimpleDateFormat format = new SimpleDateFormat(Constant.DEFAULT_DATA_FORMAT);
String startTime = originalConfig.getString(Key.BEGIN_DATE_TIME);
Long startDate;
@@ -168,14 +186,14 @@ public class TSDBReader extends Reader {
startTime = format.parse(originalConfig.getString(Key.BEGIN_DATE_TIME)).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(
- TSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.BEGIN_DATE_TIME + "]失败.", e);
+ TSDBReaderErrorCode.ILLEGAL_VALUE, "Analysis [" + Key.BEGIN_DATE_TIME + "] failed.", e);
}
long endTime;
try {
endTime = format.parse(originalConfig.getString(Key.END_DATE_TIME)).getTime();
} catch (ParseException e) {
throw DataXException.asDataXException(
- TSDBReaderErrorCode.ILLEGAL_VALUE, "解析[" + Key.END_DATE_TIME + "]失败.", e);
+ TSDBReaderErrorCode.ILLEGAL_VALUE, "Analysis [" + Key.END_DATE_TIME + "] failed.", e);
}
if (TimeUtils.isSecond(startTime)) {
startTime *= 1000;
@@ -186,13 +204,14 @@ public class TSDBReader extends Reader {
DateTime startDateTime = new DateTime(TimeUtils.getTimeInHour(startTime));
DateTime endDateTime = new DateTime(TimeUtils.getTimeInHour(endTime));
+ final Boolean isCombine = originalConfig.getBool(Key.COMBINE, Key.COMBINE_DEFAULT_VALUE);
+
if ("TSDB".equals(type)) {
- // split by metric
- for (String column : columns4TSDB) {
+ if (isCombine) {
// split by time in hour
while (startDateTime.isBefore(endDateTime)) {
Configuration clone = this.originalConfig.clone();
- clone.set(Key.COLUMN, Collections.singletonList(column));
+ clone.set(Key.COLUMN, columns4TSDB);
clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
startDateTime = startDateTime.plusMillis(splitIntervalMs);
@@ -202,15 +221,30 @@ public class TSDBReader extends Reader {
LOG.info("Configuration: {}", JSON.toJSONString(clone));
}
+ } else {
+ // split by time in hour
+ while (startDateTime.isBefore(endDateTime)) {
+ // split by metric
+ for (String column : columns4TSDB) {
+ Configuration clone = this.originalConfig.clone();
+ clone.set(Key.COLUMN, Collections.singletonList(column));
+
+ clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
+ startDateTime = startDateTime.plusMillis(splitIntervalMs);
+ // Make sure the time interval is [start, end).
+ clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
+ configurations.add(clone);
+
+ LOG.info("Configuration: {}", JSON.toJSONString(clone));
+ }
+ }
}
} else {
- // split by metric
- for (String metric : metrics) {
- // split by time in hour
+ if (isCombine) {
while (startDateTime.isBefore(endDateTime)) {
Configuration clone = this.originalConfig.clone();
clone.set(Key.COLUMN, columns4RDB);
- clone.set(Key.METRIC, Collections.singletonList(metric));
+ clone.set(Key.METRIC, metrics);
clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
startDateTime = startDateTime.plusMillis(splitIntervalMs);
@@ -220,6 +254,24 @@ public class TSDBReader extends Reader {
LOG.info("Configuration: {}", JSON.toJSONString(clone));
}
+ } else {
+ // split by time in hour
+ while (startDateTime.isBefore(endDateTime)) {
+ // split by metric
+ for (String metric : metrics) {
+ Configuration clone = this.originalConfig.clone();
+ clone.set(Key.COLUMN, columns4RDB);
+ clone.set(Key.METRIC, Collections.singletonList(metric));
+
+ clone.set(Key.BEGIN_DATE_TIME, startDateTime.getMillis());
+ startDateTime = startDateTime.plusMillis(splitIntervalMs);
+ // Make sure the time interval is [start, end).
+ clone.set(Key.END_DATE_TIME, startDateTime.getMillis() - 1);
+ configurations.add(clone);
+
+ LOG.info("Configuration: {}", JSON.toJSONString(clone));
+ }
+ }
}
}
return configurations;
@@ -247,6 +299,8 @@ public class TSDBReader extends Reader {
private TSDBConnection conn;
private Long startTime;
private Long endTime;
+ private Boolean isCombine;
+ private Map hint;
@Override
public void init() {
@@ -265,11 +319,16 @@ public class TSDBReader extends Reader {
this.tags = readerSliceConfig.getMap(Key.TAG);
String address = readerSliceConfig.getString(Key.ENDPOINT);
+ String username = readerSliceConfig.getString(Key.USERNAME);
+ String password = readerSliceConfig.getString(Key.PASSWORD);
- conn = new TSDBConnection(address);
+ conn = new TSDBConnection(address, username, password);
this.startTime = readerSliceConfig.getLong(Key.BEGIN_DATE_TIME);
this.endTime = readerSliceConfig.getLong(Key.END_DATE_TIME);
+
+ this.isCombine = readerSliceConfig.getBool(Key.COMBINE, Key.COMBINE_DEFAULT_VALUE);
+ this.hint = readerSliceConfig.getMap(Key.HINT);
}
@Override
@@ -283,29 +342,35 @@ public class TSDBReader extends Reader {
if ("TSDB".equals(type)) {
for (String metric : columns4TSDB) {
final Map tags = this.tags == null ?
- null : (Map) this.tags.get(metric);
+ null : (Map) this.tags.get(metric);
if (fields == null || !fields.containsKey(metric)) {
- conn.sendDPs(metric, tags, this.startTime, this.endTime, recordSender);
+ conn.sendDPs(metric, tags, this.startTime, this.endTime, recordSender, hint);
} else {
conn.sendDPs(metric, (List) fields.get(metric),
- tags, this.startTime, this.endTime, recordSender);
+ tags, this.startTime, this.endTime, recordSender, hint);
}
}
} else {
- for (String metric : metrics) {
+ if (isCombine) {
final Map tags = this.tags == null ?
- null : (Map) this.tags.get(metric);
- if (fields == null || !fields.containsKey(metric)) {
- conn.sendRecords(metric, tags, startTime, endTime, columns4RDB, recordSender);
- } else {
- conn.sendRecords(metric, (List) fields.get(metric),
- tags, startTime, endTime, columns4RDB, recordSender);
+ null : (Map) this.tags.get(metrics.get(0));
+ conn.sendRecords(metrics, tags, startTime, endTime, columns4RDB, recordSender, hint);
+ } else {
+ for (String metric : metrics) {
+ final Map tags = this.tags == null ?
+ null : (Map) this.tags.get(metric);
+ if (fields == null || !fields.containsKey(metric)) {
+ conn.sendRecords(metric, tags, startTime, endTime, columns4RDB, isCombine, recordSender, hint);
+ } else {
+ conn.sendRecords(metric, (List) fields.get(metric),
+ tags, startTime, endTime, columns4RDB, recordSender, hint);
+ }
}
}
}
} catch (Exception e) {
throw DataXException.asDataXException(
- TSDBReaderErrorCode.ILLEGAL_VALUE, "获取或发送数据点的过程中出错!", e);
+ TSDBReaderErrorCode.ILLEGAL_VALUE, "Error in getting or sending data point!", e);
}
}
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java
index 500894bb..96cb7f9d 100644
--- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/Connection4TSDB.java
@@ -22,6 +22,20 @@ public interface Connection4TSDB {
*/
String address();
+ /**
+ * Get the address of Database.
+ *
+ * @return host+ip
+ */
+ String username();
+
+ /**
+ * Get the address of Database.
+ *
+ * @return host+ip
+ */
+ String password();
+
/**
* Get the version of Database.
*
@@ -46,22 +60,27 @@ public interface Connection4TSDB {
/**
* Send data points for TSDB with single field.
*/
- void sendDPs(String metric, Map tags, Long start, Long end, RecordSender recordSender) throws Exception;
+ void sendDPs(String metric, Map tags, Long start, Long end, RecordSender recordSender, Map hint) throws Exception;
/**
* Send data points for TSDB with multi fields.
*/
- void sendDPs(String metric, List fields, Map tags, Long start, Long end, RecordSender recordSender) throws Exception;
+ void sendDPs(String metric, List fields, Map tags, Long start, Long end, RecordSender recordSender, Map hint) throws Exception;
/**
* Send data points for RDB with single field.
*/
- void sendRecords(String metric, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception;
+ void sendRecords(String metric, Map tags, Long start, Long end, List columns4RDB, Boolean isCombine, RecordSender recordSender, Map hint) throws Exception;
/**
* Send data points for RDB with multi fields.
*/
- void sendRecords(String metric, List fields, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception;
+ void sendRecords(String metric, List fields, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender, Map hint) throws Exception;
+
+ /**
+ * Send data points for RDB with single fields on combine mode.
+ */
+ void sendRecords(List metrics, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender, Map hint) throws Exception;
/**
* Put data point.
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java
index 5426ab49..d466da39 100644
--- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnection.java
@@ -19,9 +19,13 @@ import java.util.Map;
public class TSDBConnection implements Connection4TSDB {
private String address;
+ private String username;
+ private String password;
- public TSDBConnection(String address) {
+ public TSDBConnection(String address, String username, String password) {
this.address = address;
+ this.username = username;
+ this.password = password;
}
@Override
@@ -29,14 +33,24 @@ public class TSDBConnection implements Connection4TSDB {
return address;
}
+ @Override
+ public String username() {
+ return username;
+ }
+
+ @Override
+ public String password() {
+ return password;
+ }
+
@Override
public String version() {
- return TSDBUtils.version(address);
+ return TSDBUtils.version(address, username, password);
}
@Override
public String config() {
- return TSDBUtils.config(address);
+ return TSDBUtils.config(address, username, password);
}
@Override
@@ -45,23 +59,28 @@ public class TSDBConnection implements Connection4TSDB {
}
@Override
- public void sendDPs(String metric, Map tags, Long start, Long end, RecordSender recordSender) throws Exception {
- TSDBDump.dump4TSDB(this, metric, tags, start, end, recordSender);
+ public void sendDPs(String metric, Map tags, Long start, Long end, RecordSender recordSender, Map hint) throws Exception {
+ TSDBDump.dump4TSDB(this, metric, tags, start, end, recordSender, hint);
}
@Override
- public void sendDPs(String metric, List fields, Map tags, Long start, Long end, RecordSender recordSender) throws Exception {
- TSDBDump.dump4TSDB(this, metric, fields, tags, start, end, recordSender);
+ public void sendDPs(String metric, List fields, Map tags, Long start, Long end, RecordSender recordSender, Map hint) throws Exception {
+ TSDBDump.dump4TSDB(this, metric, fields, tags, start, end, recordSender, hint);
}
@Override
- public void sendRecords(String metric, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception {
- TSDBDump.dump4RDB(this, metric, tags, start, end, columns4RDB, recordSender);
+ public void sendRecords(String metric, Map tags, Long start, Long end, List columns4RDB, Boolean isCombine, RecordSender recordSender, Map hint) throws Exception {
+ TSDBDump.dump4RDB(this, metric, tags, start, end, columns4RDB, recordSender, hint);
}
@Override
- public void sendRecords(String metric, List fields, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender) throws Exception {
- TSDBDump.dump4RDB(this, metric, fields, tags, start, end, columns4RDB, recordSender);
+ public void sendRecords(List metrics, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender, Map hint) throws Exception {
+ TSDBDump.dump4RDB(this, metrics, tags, start, end, columns4RDB, recordSender, hint);
+ }
+
+ @Override
+ public void sendRecords(String metric, List fields, Map tags, Long start, Long end, List columns4RDB, RecordSender recordSender, Map hint) throws Exception {
+ TSDBDump.dump4RDB(this, metric, fields, tags, start, end, columns4RDB, recordSender, hint);
}
@Override
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java
index 8bae3a70..c911a062 100644
--- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBDump.java
@@ -9,10 +9,9 @@ import com.alibaba.fastjson.parser.Feature;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
+
+import static com.alibaba.datax.plugin.reader.tsdbreader.Constant.METRIC_SPECIFY_KEY_PREFIX_LENGTH;
/**
* Copyright @ 2019 alibaba.com
@@ -37,10 +36,10 @@ final class TSDBDump {
}
static void dump4TSDB(TSDBConnection conn, String metric, Map tags,
- Long start, Long end, RecordSender sender) throws Exception {
+ Long start, Long end, RecordSender sender, Map hint) throws Exception {
LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
- String res = queryRange4SingleField(conn, metric, tags, start, end);
+ String res = queryRange4SingleField(conn, metric, tags, start, end, hint);
List dps = getDps4TSDB(metric, res);
if (dps == null || dps.isEmpty()) {
return;
@@ -49,10 +48,10 @@ final class TSDBDump {
}
static void dump4TSDB(TSDBConnection conn, String metric, List fields, Map tags,
- Long start, Long end, RecordSender sender) throws Exception {
+ Long start, Long end, RecordSender sender, Map hint) throws Exception {
LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
- String res = queryRange4MultiFields(conn, metric, fields, tags, start, end);
+ String res = queryRange4MultiFields(conn, metric, fields, tags, start, end, hint);
List dps = getDps4TSDB(metric, fields, res);
if (dps == null || dps.isEmpty()) {
return;
@@ -61,10 +60,10 @@ final class TSDBDump {
}
static void dump4RDB(TSDBConnection conn, String metric, Map tags,
- Long start, Long end, List columns4RDB, RecordSender sender) throws Exception {
+ Long start, Long end, List columns4RDB, RecordSender sender, Map hint) throws Exception {
LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
- String res = queryRange4SingleField(conn, metric, tags, start, end);
+ String res = queryRange4SingleField(conn, metric, tags, start, end, hint);
List dps = getDps4RDB(metric, res);
if (dps == null || dps.isEmpty()) {
return;
@@ -92,12 +91,71 @@ final class TSDBDump {
}
}
+ public static void dump4RDB(TSDBConnection conn, List metrics, Map tags, Long start, Long end, List columns4RDB, RecordSender sender, Map hint) throws Exception {
+ LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metrics, start, end);
+
+ List dps = new LinkedList<>();
+ for (String metric : metrics) {
+ String res = queryRange4SingleField(conn, metric, tags, start, end, hint);
+ final List dpList = getDps4RDB(metric, res);
+ if (dpList == null || dpList.isEmpty()) {
+ continue;
+ }
+ dps.addAll(dpList);
+ }
+ if (dps.isEmpty()) {
+ return;
+ }
+ Map> dpsCombinedByTs = new LinkedHashMap<>();
+ for (DataPoint4TSDB dp : dps) {
+ final long ts = dp.getTimestamp();
+ final Map dpsWithSameTs = dpsCombinedByTs.computeIfAbsent(ts, k -> new LinkedHashMap<>());
+ dpsWithSameTs.put(dp.getMetric(), dp);
+ }
+
+ for (Map.Entry> entry : dpsCombinedByTs.entrySet()) {
+ final Long ts = entry.getKey();
+ final Map metricAndDps = entry.getValue();
+ final Record record = sender.createRecord();
+ DataPoint4TSDB tmpDp = null;
+
+ for (final String column : columns4RDB) {
+ if (column.startsWith(Constant.METRIC_SPECIFY_KEY)) {
+ final String m = column.substring(METRIC_SPECIFY_KEY_PREFIX_LENGTH);
+ tmpDp = metricAndDps.get(m);
+ if (tmpDp == null) {
+ continue;
+ }
+ record.addColumn(getColumn(tmpDp.getValue()));
+ } else if (Constant.TS_SPECIFY_KEY.equals(column)) {
+ record.addColumn(new LongColumn(ts));
+ } else if (Constant.VALUE_SPECIFY_KEY.equals(column)) {
+ // combine 模式下,不应该定义 __value__ 字段,因为 __metric__.xxx 字段会输出对应的 value 值
+ throw new RuntimeException("The " + Constant.VALUE_SPECIFY_KEY +
+ " column should not be specified in combine mode!");
+ } else {
+ // combine 模式下,应该确保 __metric__.xxx 字段的定义,放在 column 数组的最前面,以保证获取到 metric
+ if (tmpDp == null) {
+ throw new RuntimeException("These " + Constant.METRIC_SPECIFY_KEY_PREFIX +
+ " column should be placed first in the column array in combine mode!");
+ }
+ final Object tagv = tmpDp.getTags().get(column);
+ if (tagv == null) {
+ continue;
+ }
+ record.addColumn(getColumn(tagv));
+ }
+ }
+ sender.sendToWriter(record);
+ }
+ }
+
static void dump4RDB(TSDBConnection conn, String metric, List fields,
Map tags, Long start, Long end,
- List columns4RDB, RecordSender sender) throws Exception {
+ List columns4RDB, RecordSender sender, Map hint) throws Exception {
LOG.info("conn address: {}, metric: {}, start: {}, end: {}", conn.address(), metric, start, end);
- String res = queryRange4MultiFields(conn, metric, fields, tags, start, end);
+ String res = queryRange4MultiFields(conn, metric, fields, tags, start, end, hint);
List dps = getDps4RDB(metric, fields, res);
if (dps == null || dps.isEmpty()) {
return;
@@ -131,14 +189,16 @@ final class TSDBDump {
valueColumn = new LongColumn((Long) value);
} else if (value instanceof String) {
valueColumn = new StringColumn((String) value);
+ } else if (value instanceof Integer) {
+ valueColumn = new LongColumn(((Integer)value).longValue());
} else {
- throw new Exception(String.format("value 不支持类型: [%s]", value.getClass().getSimpleName()));
+ throw new Exception(String.format("value not supported type: [%s]", value.getClass().getSimpleName()));
}
return valueColumn;
}
private static String queryRange4SingleField(TSDBConnection conn, String metric, Map tags,
- Long start, Long end) throws Exception {
+ Long start, Long end, Map hint) throws Exception {
String tagKV = getFilterByTags(tags);
String body = "{\n" +
" \"start\": " + start + ",\n" +
@@ -148,14 +208,15 @@ final class TSDBDump {
" \"aggregator\": \"none\",\n" +
" \"metric\": \"" + metric + "\"\n" +
(tagKV == null ? "" : tagKV) +
+ (hint == null ? "" : (", \"hint\": " + JSON.toJSONString(hint))) +
" }\n" +
" ]\n" +
"}";
- return HttpUtils.post(conn.address() + QUERY, body);
+ return HttpUtils.post(conn.address() + QUERY, conn.username(), conn.password(), body);
}
private static String queryRange4MultiFields(TSDBConnection conn, String metric, List fields,
- Map tags, Long start, Long end) throws Exception {
+ Map tags, Long start, Long end, Map hint) throws Exception {
// fields
StringBuilder fieldBuilder = new StringBuilder();
fieldBuilder.append("\"fields\":[");
@@ -177,10 +238,11 @@ final class TSDBDump {
" \"metric\": \"" + metric + "\",\n" +
fieldBuilder.toString() +
(tagKV == null ? "" : tagKV) +
+ (hint == null ? "" : (", \"hint\": " + JSON.toJSONString(hint))) +
" }\n" +
" ]\n" +
"}";
- return HttpUtils.post(conn.address() + QUERY_MULTI_FIELD, body);
+ return HttpUtils.post(conn.address() + QUERY_MULTI_FIELD, conn.username(), conn.password(), body);
}
private static String getFilterByTags(Map tags) {
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtils.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtils.java
index 3e0be854..5cba4e54 100644
--- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtils.java
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtils.java
@@ -1,11 +1,13 @@
package com.alibaba.datax.plugin.reader.tsdbreader.util;
import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.fluent.Content;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
import java.nio.charset.StandardCharsets;
+import java.util.Base64;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -22,13 +24,18 @@ public final class HttpUtils {
public final static int CONNECT_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
public final static int SOCKET_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
+ private static final String CREDENTIALS_FORMAT = "%s:%s";
+ private static final String BASIC_AUTHENTICATION_FORMAT = "Basic %s";
+
private HttpUtils() {
}
- public static String get(String url) throws Exception {
- Content content = Request.Get(url)
+ public static String get(String url, String username, String password) throws Exception {
+ final Request request = Request.Get(url)
.connectTimeout(CONNECT_TIMEOUT_DEFAULT_IN_MILL)
- .socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL)
+ .socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL);
+ addAuth(request, username, password);
+ Content content = request
.execute()
.returnContent();
if (content == null) {
@@ -37,24 +44,21 @@ public final class HttpUtils {
return content.asString(StandardCharsets.UTF_8);
}
- public static String post(String url, Map params) throws Exception {
- return post(url, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
+ public static String post(String url, String username, String password, Map params) throws Exception {
+ return post(url, username, password, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
}
- public static String post(String url, String params) throws Exception {
- return post(url, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
+ public static String post(String url, String username, String password, String params) throws Exception {
+ return post(url, username, password, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
}
- public static String post(String url, Map params,
+ public static String post(String url, String username, String password, String params,
int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
- return post(url, JSON.toJSONString(params), connectTimeoutInMill, socketTimeoutInMill);
- }
-
- public static String post(String url, String params,
- int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
- Content content = Request.Post(url)
+ Request request = Request.Post(url)
.connectTimeout(connectTimeoutInMill)
- .socketTimeout(socketTimeoutInMill)
+ .socketTimeout(socketTimeoutInMill);
+ addAuth(request, username, password);
+ Content content = request
.addHeader("Content-Type", "application/json")
.bodyString(params, ContentType.APPLICATION_JSON)
.execute()
@@ -64,4 +68,20 @@ public final class HttpUtils {
}
return content.asString(StandardCharsets.UTF_8);
}
+
+ private static void addAuth(Request request, String username, String password) {
+ String authorization = generateHttpAuthorization(username, password);
+ if (authorization != null) {
+ request.setHeader("Authorization", authorization);
+ }
+ }
+
+ private static String generateHttpAuthorization(String username, String password) {
+ if (StringUtils.isBlank(username) || StringUtils.isBlank(password)) {
+ return null;
+ }
+ String credentials = String.format(CREDENTIALS_FORMAT, username, password);
+ credentials = Base64.getEncoder().encodeToString(credentials.getBytes());
+ return String.format(BASIC_AUTHENTICATION_FORMAT, credentials);
+ }
}
diff --git a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TSDBUtils.java b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TSDBUtils.java
index bb7b4b87..d91c3557 100644
--- a/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TSDBUtils.java
+++ b/tsdbreader/src/main/java/com/alibaba/datax/plugin/reader/tsdbreader/util/TSDBUtils.java
@@ -1,11 +1,5 @@
package com.alibaba.datax.plugin.reader.tsdbreader.util;
-import com.alibaba.datax.plugin.reader.tsdbreader.conn.DataPoint4TSDB;
-import com.alibaba.fastjson.JSON;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.List;
/**
* Copyright @ 2019 alibaba.com
@@ -17,52 +11,28 @@ import java.util.List;
*/
public final class TSDBUtils {
- private static final Logger LOGGER = LoggerFactory.getLogger(TSDBUtils.class);
-
private TSDBUtils() {
}
- public static String version(String address) {
+ public static String version(String address, String username, String password) {
String url = String.format("%s/api/version", address);
String rsp;
try {
- rsp = HttpUtils.get(url);
+ rsp = HttpUtils.get(url, username, password);
} catch (Exception e) {
throw new RuntimeException(e);
}
return rsp;
}
- public static String config(String address) {
+ public static String config(String address, String username, String password) {
String url = String.format("%s/api/config", address);
String rsp;
try {
- rsp = HttpUtils.get(url);
+ rsp = HttpUtils.get(url, username, password);
} catch (Exception e) {
throw new RuntimeException(e);
}
return rsp;
}
-
- public static boolean put(String address, List dps) {
- return put(address, JSON.toJSON(dps));
- }
-
- public static boolean put(String address, DataPoint4TSDB dp) {
- return put(address, JSON.toJSON(dp));
- }
-
- private static boolean put(String address, Object o) {
- String url = String.format("%s/api/put", address);
- String rsp;
- try {
- rsp = HttpUtils.post(url, o.toString());
- // If successful, the returned content should be null.
- assert rsp == null;
- } catch (Exception e) {
- LOGGER.error("Address: {}, DataPoints: {}", url, o);
- throw new RuntimeException(e);
- }
- return true;
- }
}
diff --git a/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnectionTest.java b/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnectionTest.java
index e4544088..6be291e8 100644
--- a/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnectionTest.java
+++ b/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/conn/TSDBConnectionTest.java
@@ -19,12 +19,12 @@ public class TSDBConnectionTest {
@Test
public void testVersion() {
- String version = new TSDBConnection(TSDB_ADDRESS).version();
+ String version = new TSDBConnection(TSDB_ADDRESS,null,null).version();
Assert.assertNotNull(version);
}
@Test
public void testIsSupported() {
- Assert.assertTrue(new TSDBConnection(TSDB_ADDRESS).isSupported());
+ Assert.assertTrue(new TSDBConnection(TSDB_ADDRESS,null,null).isSupported());
}
}
diff --git a/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtilsTest.java b/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtilsTest.java
deleted file mode 100644
index 12a2660a..00000000
--- a/tsdbreader/src/test/java/com/alibaba/datax/plugin/reader/tsdbreader/util/HttpUtilsTest.java
+++ /dev/null
@@ -1,39 +0,0 @@
-package com.alibaba.datax.plugin.reader.tsdbreader.util;
-
-import org.junit.Assert;
-import org.junit.Ignore;
-import org.junit.Test;
-
-import java.util.HashMap;
-import java.util.Map;
-
-/**
- * Copyright @ 2019 alibaba.com
- * All right reserved.
- * Function:HttpUtils Test
- *
- * @author Benedict Jin
- * @since 2019-10-21
- */
-@Ignore
-public class HttpUtilsTest {
-
- @Test
- public void testSimpleCase() throws Exception {
- String url = "https://httpbin.org/post";
- Map params = new HashMap<>();
- params.put("foo", "bar");
-
- String rsp = HttpUtils.post(url, params);
- System.out.println(rsp);
- Assert.assertNotNull(rsp);
- }
-
- @Test
- public void testGet() throws Exception {
- String url = String.format("%s/api/version", Const.TSDB_ADDRESS);
- String rsp = HttpUtils.get(url);
- System.out.println(rsp);
- Assert.assertNotNull(rsp);
- }
-}
diff --git a/tsdbwriter/pom.xml b/tsdbwriter/pom.xml
index 1fb7c1e0..fd4cc6f5 100644
--- a/tsdbwriter/pom.xml
+++ b/tsdbwriter/pom.xml
@@ -91,6 +91,13 @@
${fastjson.version}
+
+
+ com.aliyun
+ hitsdb-client
+ 0.4.0-SNAPSHOT
+
+
junit
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/Connection4TSDB.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/Connection4TSDB.java
index 8119348d..ecb30055 100644
--- a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/Connection4TSDB.java
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/Connection4TSDB.java
@@ -21,6 +21,28 @@ public interface Connection4TSDB {
*/
String address();
+ /**
+ * Get the setted database name.
+ *
+ * @return database
+ */
+ String database();
+
+
+ /**
+ * Get the username of Database.
+ *
+ * @return username
+ */
+ String username();
+
+ /**
+ * Get the password of Database.
+ *
+ * @return password
+ */
+ String password();
+
/**
* Get the version of Database.
*
@@ -69,17 +91,25 @@ public interface Connection4TSDB {
boolean put(List dps);
/**
- * Put data points.
+ * Put data points with single field.
*
* @param dps data points
* @return whether the data point is written successfully
*/
boolean put(String dps);
+ /**
+ * Put data points with multi fields.
+ *
+ * @param dps data points
+ * @return whether the data point is written successfully
+ */
+ boolean mput(String dps);
+
/**
* Whether current version is supported.
*
* @return true: supported; false: not yet!
*/
boolean isSupported();
-}
+}
\ No newline at end of file
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/TSDBConnection.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/TSDBConnection.java
index e4ebad7d..074f0295 100644
--- a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/TSDBConnection.java
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/conn/TSDBConnection.java
@@ -18,12 +18,18 @@ import java.util.List;
public class TSDBConnection implements Connection4TSDB {
private String address;
+ private String username;
+ private String password;
+ private String database;
- public TSDBConnection(String address) {
+ public TSDBConnection(String address, String database, String username, String password) {
if (StringUtils.isBlank(address)) {
throw new RuntimeException("TSDBConnection init failed because address is blank!");
}
this.address = address;
+ this.database = database;
+ this.username = username;
+ this.password = password;
}
@Override
@@ -31,14 +37,29 @@ public class TSDBConnection implements Connection4TSDB {
return address;
}
+ @Override
+ public String username() {
+ return username;
+ }
+
+ @Override
+ public String database() {
+ return database;
+ }
+
+ @Override
+ public String password() {
+ return password;
+ }
+
@Override
public String version() {
- return TSDBUtils.version(address);
+ return TSDBUtils.version(address, username, password);
}
@Override
public String config() {
- return TSDBUtils.config(address);
+ return TSDBUtils.config(address, username, password);
}
@Override
@@ -53,17 +74,22 @@ public class TSDBConnection implements Connection4TSDB {
@Override
public boolean put(DataPoint4TSDB dp) {
- return TSDBUtils.put(address, dp);
+ return TSDBUtils.put(address, database, username, password, dp);
}
@Override
public boolean put(List dps) {
- return TSDBUtils.put(address, dps);
+ return TSDBUtils.put(address, database, username, password, dps);
}
@Override
public boolean put(String dps) {
- return TSDBUtils.put(address, dps);
+ return TSDBUtils.put(address, database, username, password, dps);
+ }
+
+ @Override
+ public boolean mput(String dps) {
+ return TSDBUtils.mput(address, database, username, password, dps);
}
@Override
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Key.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Key.java
index 2cc3f671..6cb239ec 100755
--- a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Key.java
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/Key.java
@@ -10,8 +10,22 @@ package com.alibaba.datax.plugin.writer.tsdbwriter;
*/
public class Key {
+ static final String SOURCE_DB_TYPE = "sourceDbType";
+ static final String MULTI_FIELD = "multiField";
+
+ // common
static final String ENDPOINT = "endpoint";
+ static final String USERNAME = "username";
+ static final String PASSWORD = "password";
+ static final String IGNORE_WRITE_ERROR = "ignoreWriteError";
+ static final String DATABASE = "database";
+
+ // for tsdb
static final String BATCH_SIZE = "batchSize";
static final String MAX_RETRY_TIME = "maxRetryTime";
- static final String IGNORE_WRITE_ERROR = "ignoreWriteError";
+
+ // for rdb
+ static final String COLUMN = "column";
+ static final String COLUMN_TYPE = "columnType";
+ static final String TABLE = "table";
}
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/SourceDBType.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/SourceDBType.java
new file mode 100644
index 00000000..792806a6
--- /dev/null
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/SourceDBType.java
@@ -0,0 +1,5 @@
+package com.alibaba.datax.plugin.writer.tsdbwriter;
+
+public enum SourceDBType {
+ TSDB, RDB
+}
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBConverter.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBConverter.java
new file mode 100644
index 00000000..86e35c56
--- /dev/null
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBConverter.java
@@ -0,0 +1,96 @@
+package com.alibaba.datax.plugin.writer.tsdbwriter;
+
+import com.alibaba.datax.common.element.Column;
+import com.alibaba.datax.common.element.Record;
+import com.alibaba.fastjson.JSON;
+import com.aliyun.hitsdb.client.value.request.MultiFieldPoint;
+import com.aliyun.hitsdb.client.value.request.Point;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+class TSDBConverter {
+
+ private static final Logger LOG = LoggerFactory.getLogger(TSDBConverter.class);
+
+ private List columnName;
+ private List columnType;
+
+ TSDBConverter(List columnName, List columnType) {
+ this.columnName = columnName;
+ this.columnType = columnType;
+ LOG.info("columnName: {}, columnType: {}", JSON.toJSONString(columnName), JSON.toJSONString(columnType));
+ }
+
+ List transRecord2Point(List records) {
+ List dps = new ArrayList();
+ for (Record record : records) {
+ List metricBuilders = new ArrayList();
+ Map tags = new HashMap();
+ Long time = 0L;
+
+ for (int i = 0; i < columnType.size(); i++) {
+ String type = columnType.get(i);
+ String name = columnName.get(i);
+ Column column = record.getColumn(i);
+ if (TSDBModel.TSDB_TAG.equals(type)) {
+ tags.put(name, column.asString());
+ } else if (TSDBModel.TSDB_FIELD_DOUBLE.equals(type)) {
+ metricBuilders.add(new Point.MetricBuilder(name).value(column.asDouble()));
+ } else if (TSDBModel.TSDB_FIELD_STRING.equals(type)) {
+ metricBuilders.add(new Point.MetricBuilder(name).value(column.asString()));
+ } else if (TSDBModel.TSDB_FIELD_BOOL.equals(type)) {
+ metricBuilders.add(new Point.MetricBuilder(name).value(column.asBoolean()));
+ } else if (TSDBModel.TSDB_TIMESTAMP.equals(type)) {
+ time = column.asLong();
+ } else if (TSDBModel.TSDB_METRIC_NUM.equals(type)) {
+ // compatible with previous usage of TSDB_METRIC_NUM
+ metricBuilders.add(new Point.MetricBuilder(name).value(column.asDouble()));
+ } else if (TSDBModel.TSDB_METRIC_STRING.equals(type)) {
+ // compatible with previous usage of TSDB_METRIC_STRING
+ metricBuilders.add(new Point.MetricBuilder(name).value(column.asString()));
+ }
+ }
+ for (Point.MetricBuilder metricBuilder : metricBuilders) {
+ dps.add(metricBuilder.tag(tags).timestamp(time).build(false));
+ }
+ }
+ return dps;
+ }
+
+ List transRecord2MultiFieldPoint(List records, String tableName) {
+ List dps = new ArrayList();
+ for (Record record : records) {
+ MultiFieldPoint.MetricBuilder builder = MultiFieldPoint.metric(tableName);
+ for (int i = 0; i < columnType.size(); i++) {
+ String type = columnType.get(i);
+ String name = columnName.get(i);
+ Column column = record.getColumn(i);
+ if (TSDBModel.TSDB_TAG.equals(type)) {
+ builder.tag(name, column.asString());
+ } else if (TSDBModel.TSDB_FIELD_DOUBLE.equals(type)) {
+ builder.field(name, column.asDouble());
+ } else if (TSDBModel.TSDB_FIELD_STRING.equals(type)) {
+ builder.field(name, column.asString());
+ } else if (TSDBModel.TSDB_FIELD_BOOL.equals(type)) {
+ builder.field(name, column.asBoolean());
+ } else if (TSDBModel.TSDB_TIMESTAMP.equals(type)) {
+ builder.timestamp(column.asLong());
+ } else if (TSDBModel.TSDB_METRIC_NUM.equals(type)) {
+ // compatible with previous usage of TSDB_METRIC_NUM
+ builder.field(name, column.asDouble());
+ } else if (TSDBModel.TSDB_METRIC_STRING.equals(type)) {
+ // compatible with previous usage of TSDB_METRIC_STRING
+ builder.field(name, column.asString());
+ }
+ }
+ MultiFieldPoint point = builder.build(false);
+ dps.add(point);
+ }
+ return dps;
+ }
+}
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBModel.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBModel.java
new file mode 100644
index 00000000..ead0e2cc
--- /dev/null
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBModel.java
@@ -0,0 +1,11 @@
+package com.alibaba.datax.plugin.writer.tsdbwriter;
+
+class TSDBModel {
+ static final String TSDB_METRIC_NUM = "metric_num";
+ static final String TSDB_METRIC_STRING = "metric_string";
+ static final String TSDB_TAG = "tag";
+ static final String TSDB_TIMESTAMP = "timestamp";
+ static final String TSDB_FIELD_DOUBLE = "field_double";
+ static final String TSDB_FIELD_STRING = "field_string";
+ static final String TSDB_FIELD_BOOL = "field_bool";
+}
\ No newline at end of file
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriter.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriter.java
index e410b2ba..85a32a07 100755
--- a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriter.java
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriter.java
@@ -7,12 +7,20 @@ import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.common.util.RetryUtil;
import com.alibaba.datax.plugin.writer.conn.TSDBConnection;
+import com.aliyun.hitsdb.client.TSDB;
+import com.aliyun.hitsdb.client.TSDBClientFactory;
+import com.aliyun.hitsdb.client.TSDBConfig;
+import com.aliyun.hitsdb.client.value.request.MultiFieldPoint;
+import com.aliyun.hitsdb.client.value.request.Point;
+import com.aliyun.hitsdb.client.value.response.batch.IgnoreErrorsResult;
+import com.aliyun.hitsdb.client.value.response.batch.MultiFieldIgnoreErrorsResult;
+import com.aliyun.hitsdb.client.value.response.batch.SummaryResult;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.ArrayList;
-import java.util.List;
+import java.io.IOException;
+import java.util.*;
import java.util.concurrent.Callable;
/**
@@ -26,6 +34,9 @@ import java.util.concurrent.Callable;
@SuppressWarnings("unused")
public class TSDBWriter extends Writer {
+ private static SourceDBType DB_TYPE;
+ private static TSDB tsdb = null;
+
public static class Job extends Writer.Job {
private static final Logger LOG = LoggerFactory.getLogger(Job.class);
@@ -34,33 +45,99 @@ public class TSDBWriter extends Writer {
@Override
public void init() {
- this.originalConfig = super.getPluginJobConf();
+ originalConfig = super.getPluginJobConf();
- String address = this.originalConfig.getString(Key.ENDPOINT);
- if (StringUtils.isBlank(address)) {
+ // check source db type
+ String sourceDbType = originalConfig.getString(Key.SOURCE_DB_TYPE);
+ if (StringUtils.isBlank(sourceDbType)) {
+ sourceDbType = SourceDBType.TSDB.name();
+ originalConfig.set(Key.SOURCE_DB_TYPE, sourceDbType);
+ LOG.info("The parameter [" + Key.SOURCE_DB_TYPE + "] will be default value: " + SourceDBType.TSDB);
+ }
+ try {
+ DB_TYPE = SourceDBType.valueOf(sourceDbType);
+ } catch (Exception e) {
throw DataXException.asDataXException(TSDBWriterErrorCode.REQUIRED_VALUE,
- "The parameter [" + Key.ENDPOINT + "] is not set.");
+ "The parameter [" + Key.SOURCE_DB_TYPE +
+ "] is invalid, which should be one of [" + Arrays.toString(SourceDBType.values()) + "].");
}
- Integer batchSize = this.originalConfig.getInt(Key.BATCH_SIZE);
- if (batchSize == null || batchSize < 1) {
- originalConfig.set(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE);
- LOG.info("The parameter [" + Key.BATCH_SIZE +
- "] will be default value: " + Constant.DEFAULT_BATCH_SIZE);
- }
+ // for tsdb
+ if (DB_TYPE == SourceDBType.TSDB) {
+ String address = originalConfig.getString(Key.ENDPOINT);
+ if (StringUtils.isBlank(address)) {
+ throw DataXException.asDataXException(TSDBWriterErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.ENDPOINT + "] is not set.");
+ }
- Integer retrySize = this.originalConfig.getInt(Key.MAX_RETRY_TIME);
- if (retrySize == null || retrySize < 0) {
- originalConfig.set(Key.MAX_RETRY_TIME, Constant.DEFAULT_TRY_SIZE);
- LOG.info("The parameter [" + Key.MAX_RETRY_TIME +
- "] will be default value: " + Constant.DEFAULT_TRY_SIZE);
- }
+ String username = originalConfig.getString(Key.USERNAME, null);
+ if (StringUtils.isBlank(username)) {
+ LOG.warn("The parameter [" + Key.USERNAME + "] is blank.");
+ }
+ String password = originalConfig.getString(Key.PASSWORD, null);
+ if (StringUtils.isBlank(password)) {
+ LOG.warn("The parameter [" + Key.PASSWORD + "] is blank.");
+ }
+
+ Integer batchSize = originalConfig.getInt(Key.BATCH_SIZE);
+ if (batchSize == null || batchSize < 1) {
+ originalConfig.set(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_SIZE);
+ LOG.info("The parameter [" + Key.BATCH_SIZE +
+ "] will be default value: " + Constant.DEFAULT_BATCH_SIZE);
+ }
+
+ Integer retrySize = originalConfig.getInt(Key.MAX_RETRY_TIME);
+ if (retrySize == null || retrySize < 0) {
+ originalConfig.set(Key.MAX_RETRY_TIME, Constant.DEFAULT_TRY_SIZE);
+ LOG.info("The parameter [" + Key.MAX_RETRY_TIME +
+ "] will be default value: " + Constant.DEFAULT_TRY_SIZE);
+ }
+
+ Boolean ignoreWriteError = originalConfig.getBool(Key.IGNORE_WRITE_ERROR);
+ if (ignoreWriteError == null) {
+ originalConfig.set(Key.IGNORE_WRITE_ERROR, Constant.DEFAULT_IGNORE_WRITE_ERROR);
+ LOG.info("The parameter [" + Key.IGNORE_WRITE_ERROR +
+ "] will be default value: " + Constant.DEFAULT_IGNORE_WRITE_ERROR);
+ }
+ } else if (DB_TYPE == SourceDBType.RDB) {
+ // for rdb
+ originalConfig.getNecessaryValue(Key.ENDPOINT, TSDBWriterErrorCode.REQUIRED_VALUE);
+ originalConfig.getNecessaryValue(Key.COLUMN_TYPE, TSDBWriterErrorCode.REQUIRED_VALUE);
+ originalConfig.getNecessaryValue(Key.COLUMN, TSDBWriterErrorCode.REQUIRED_VALUE);
+ String endpoint = originalConfig.getString(Key.ENDPOINT);
+ String[] split = endpoint.split(":");
+ if (split.length != 3) {
+ throw DataXException.asDataXException(TSDBWriterErrorCode.REQUIRED_VALUE,
+ "The parameter [" + Key.ENDPOINT + "] is invalid, which should be [http://IP:Port].");
+ }
+ String ip = split[1].substring(2);
+ int port = Integer.parseInt(split[2]);
+
+ String username = originalConfig.getString(Key.USERNAME, null);
+ if (StringUtils.isBlank(username)) {
+ LOG.warn("The parameter [" + Key.USERNAME + "] is blank.");
+ }
+
+ String password = originalConfig.getString(Key.PASSWORD, null);
+ if (StringUtils.isBlank(password)) {
+ LOG.warn("The parameter [" + Key.PASSWORD + "] is blank.");
+ }
+
+ if (!StringUtils.isBlank(password) && !StringUtils.isBlank(username)) {
+ tsdb = TSDBClientFactory.connect(TSDBConfig.address(ip, port).basicAuth(username, password).config());
+ } else {
+ tsdb = TSDBClientFactory.connect(TSDBConfig.address(ip, port).config());
+ }
+
+ String database = originalConfig.getString(Key.DATABASE, null);
+ if (StringUtils.isBlank(database)) {
+ LOG.info("The parameter [" + Key.DATABASE + "] is blank.");
+ } else {
+ tsdb.useDatabase(database);
+ }
+
+ LOG.info("Tsdb config:" + originalConfig.toJSON());
- Boolean ignoreWriteError = this.originalConfig.getBool(Key.IGNORE_WRITE_ERROR);
- if (ignoreWriteError == null) {
- originalConfig.set(Key.IGNORE_WRITE_ERROR, Constant.DEFAULT_IGNORE_WRITE_ERROR);
- LOG.info("The parameter [" + Key.IGNORE_WRITE_ERROR +
- "] will be default value: " + Constant.DEFAULT_IGNORE_WRITE_ERROR);
}
}
@@ -72,7 +149,7 @@ public class TSDBWriter extends Writer {
public List split(int mandatoryNumber) {
ArrayList configurations = new ArrayList(mandatoryNumber);
for (int i = 0; i < mandatoryNumber; i++) {
- configurations.add(this.originalConfig.clone());
+ configurations.add(originalConfig.clone());
}
return configurations;
}
@@ -83,6 +160,14 @@ public class TSDBWriter extends Writer {
@Override
public void destroy() {
+ if (DB_TYPE == SourceDBType.RDB) {
+ if (tsdb != null) {
+ try {
+ tsdb.close();
+ } catch (IOException ignored) {
+ }
+ }
+ }
}
}
@@ -91,18 +176,87 @@ public class TSDBWriter extends Writer {
private static final Logger LOG = LoggerFactory.getLogger(Task.class);
private TSDBConnection conn;
+ private boolean multiField;
private int batchSize;
private int retrySize;
private boolean ignoreWriteError;
+ private String tableName;
+ private TSDBConverter tsdbConverter;
@Override
public void init() {
Configuration writerSliceConfig = getPluginJobConf();
- String address = writerSliceConfig.getString(Key.ENDPOINT);
- this.conn = new TSDBConnection(address);
- this.batchSize = writerSliceConfig.getInt(Key.BATCH_SIZE);
- this.retrySize = writerSliceConfig.getInt(Key.MAX_RETRY_TIME);
+
+ // single field | multi fields
+ this.multiField = writerSliceConfig.getBool(Key.MULTI_FIELD, false);
this.ignoreWriteError = writerSliceConfig.getBool(Key.IGNORE_WRITE_ERROR);
+
+ // for tsdb
+ if (DB_TYPE == SourceDBType.TSDB) {
+ String address = writerSliceConfig.getString(Key.ENDPOINT);
+ String database = writerSliceConfig.getString(Key.DATABASE);
+ String username = writerSliceConfig.getString(Key.USERNAME);
+ String password = writerSliceConfig.getString(Key.PASSWORD);
+ this.conn = new TSDBConnection(address, database, username, password);
+ this.batchSize = writerSliceConfig.getInt(Key.BATCH_SIZE);
+ this.retrySize = writerSliceConfig.getInt(Key.MAX_RETRY_TIME);
+
+ } else if (DB_TYPE == SourceDBType.RDB) {
+ // for rdb
+ int timeSize = 0;
+ int fieldSize = 0;
+ int tagSize = 0;
+ batchSize = writerSliceConfig.getInt(Key.BATCH_SIZE, 100);
+ List columnName = writerSliceConfig.getList(Key.COLUMN, String.class);
+ List columnType = writerSliceConfig.getList(Key.COLUMN_TYPE, String.class);
+ Set typeSet = new HashSet(columnType);
+ if (columnName.size() != columnType.size()) {
+ throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE,
+ "The parameter [" + Key.COLUMN_TYPE + "] should has same length with [" + Key.COLUMN + "].");
+ }
+
+ for (String type : columnType) {
+ if (TSDBModel.TSDB_TAG.equals(type)) {
+ tagSize ++;
+ } else if (TSDBModel.TSDB_FIELD_DOUBLE.equals(type) || TSDBModel.TSDB_FIELD_STRING.equals(type)
+ || TSDBModel.TSDB_FIELD_BOOL.equals(type)) {
+ fieldSize++;
+ } else if (TSDBModel.TSDB_TIMESTAMP.equals(type)) {
+ timeSize++;
+ }
+ }
+
+ if (fieldSize == 0) {
+ // compatible with previous usage of TSDB_METRIC_NUM and TSDB_METRIC_STRING
+ if (!typeSet.contains(TSDBModel.TSDB_METRIC_NUM) && !typeSet.contains(TSDBModel.TSDB_METRIC_STRING)) {
+ throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE,
+ "The parameter [" + Key.COLUMN_TYPE + "] is invalid, must set at least one of "
+ + TSDBModel.TSDB_FIELD_DOUBLE + ", " + TSDBModel.TSDB_FIELD_STRING + " or " + TSDBModel.TSDB_FIELD_BOOL + ".");
+ }
+ }
+
+ if (tagSize == 0) {
+ throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE,
+ "The parameter [" + Key.COLUMN_TYPE + "] is invalid, must set " + TSDBModel.TSDB_TAG + ". ");
+ }
+
+ if (timeSize != 1) {
+ throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE,
+ "The parameter [" + Key.COLUMN_TYPE + "] is invalid, must set one and only one "
+ + TSDBModel.TSDB_TIMESTAMP + ".");
+ }
+
+ if (multiField) {
+ // check source db type
+ tableName = writerSliceConfig.getString(Key.TABLE);
+ if (StringUtils.isBlank(tableName)) {
+ throw DataXException.asDataXException(TSDBWriterErrorCode.ILLEGAL_VALUE,
+ "The parameter [" + Key.TABLE + "] h must set when use multi field input.");
+ }
+ }
+ tsdbConverter = new TSDBConverter(columnName, columnType);
+
+ }
}
@Override
@@ -111,30 +265,52 @@ public class TSDBWriter extends Writer {
@Override
public void startWrite(RecordReceiver recordReceiver) {
- try {
- Record lastRecord = null;
- Record record;
- int count = 0;
- StringBuilder dps = new StringBuilder();
- while ((record = recordReceiver.getFromReader()) != null) {
- final int recordLength = record.getColumnNumber();
- for (int i = 0; i < recordLength; i++) {
- dps.append(record.getColumn(i).asString());
- dps.append(",");
- count++;
- if (count == batchSize) {
- count = 0;
- batchPut(record, "[" + dps.substring(0, dps.length() - 1) + "]");
- dps = new StringBuilder();
+ // for tsdb
+ if (DB_TYPE == SourceDBType.TSDB) {
+ try {
+ Record lastRecord = null;
+ Record record;
+ int count = 0;
+ StringBuilder dps = new StringBuilder();
+ while ((record = recordReceiver.getFromReader()) != null) {
+ final int recordLength = record.getColumnNumber();
+ for (int i = 0; i < recordLength; i++) {
+ dps.append(record.getColumn(i).asString());
+ dps.append(",");
+ count++;
+ if (count == batchSize) {
+ count = 0;
+ batchPut(record, "[" + dps.substring(0, dps.length() - 1) + "]");
+ dps = new StringBuilder();
+ }
}
+ lastRecord = record;
}
- lastRecord = record;
+ if (StringUtils.isNotBlank(dps.toString())) {
+ batchPut(lastRecord, "[" + dps.substring(0, dps.length() - 1) + "]");
+ }
+ } catch (Exception e) {
+ throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION, e);
}
- if (StringUtils.isNotBlank(dps.toString())) {
- batchPut(lastRecord, "[" + dps.substring(0, dps.length() - 1) + "]");
+ } else if (DB_TYPE == SourceDBType.RDB) {
+ // for rdb
+ List writerBuffer = new ArrayList(this.batchSize);
+ Record record;
+ long total = 0;
+ while ((record = recordReceiver.getFromReader()) != null) {
+ writerBuffer.add(record);
+ if (writerBuffer.size() >= this.batchSize) {
+ total += doBatchInsert(writerBuffer);
+ writerBuffer.clear();
+ }
}
- } catch (Exception e) {
- throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION, e);
+ if (!writerBuffer.isEmpty()) {
+ total += doBatchInsert(writerBuffer);
+ writerBuffer.clear();
+ }
+ getTaskPluginCollector().collectMessage("write size", total + "");
+ LOG.info("Task finished, write size: {}", total);
+
}
}
@@ -143,12 +319,13 @@ public class TSDBWriter extends Writer {
RetryUtil.executeWithRetry(new Callable() {
@Override
public Integer call() {
- if (!conn.put(dps)) {
- getTaskPluginCollector().collectDirtyRecord(record, "Put data points failed!");
- throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION,
- "Put data points failed!");
+ final boolean success = multiField ? conn.mput(dps) : conn.put(dps);
+ if (success) {
+ return 0;
}
- return 0;
+ getTaskPluginCollector().collectDirtyRecord(record, "Put data points failed!");
+ throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION,
+ "Put data points failed!");
}
}, retrySize, 60000L, true);
} catch (Exception e) {
@@ -160,6 +337,47 @@ public class TSDBWriter extends Writer {
}
}
+ private long doBatchInsert(final List writerBuffer) {
+ int size;
+ if (ignoreWriteError) {
+ if (multiField) {
+ List points = tsdbConverter.transRecord2MultiFieldPoint(writerBuffer, tableName);
+ size = points.size();
+ MultiFieldIgnoreErrorsResult ignoreErrorsResult = tsdb.multiFieldPutSync(points, MultiFieldIgnoreErrorsResult.class);
+ if (ignoreErrorsResult == null) {
+ LOG.error("Unexpected inner error for insert");
+ } else if (ignoreErrorsResult.getFailed() > 0) {
+ LOG.error("write TSDB failed num:" + ignoreErrorsResult.getFailed());
+ }
+ } else {
+ List points = tsdbConverter.transRecord2Point(writerBuffer);
+ size = points.size();
+ IgnoreErrorsResult ignoreErrorsResult = tsdb.putSync(points, IgnoreErrorsResult.class);
+ if (ignoreErrorsResult == null) {
+ LOG.error("Unexpected inner error for insert");
+ } else if (ignoreErrorsResult.getFailed() > 0) {
+ LOG.error("write TSDB failed num:" + ignoreErrorsResult.getFailed());
+ }
+ }
+ } else {
+ SummaryResult summaryResult;
+ if (multiField) {
+ List points = tsdbConverter.transRecord2MultiFieldPoint(writerBuffer, tableName);
+ size = points.size();
+ summaryResult = tsdb.multiFieldPutSync(points, SummaryResult.class);
+ } else {
+ List points = tsdbConverter.transRecord2Point(writerBuffer);
+ size = points.size();
+ summaryResult = tsdb.putSync(points, SummaryResult.class);
+ }
+ if (summaryResult.getFailed() > 0) {
+ LOG.error("write TSDB failed num:" + summaryResult.getFailed());
+ throw DataXException.asDataXException(TSDBWriterErrorCode.RUNTIME_EXCEPTION, "Write TSDB failed", new Exception());
+ }
+ }
+ return size;
+ }
+
@Override
public void post() {
}
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriterErrorCode.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriterErrorCode.java
index f907fb67..ab4c3894 100755
--- a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriterErrorCode.java
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/tsdbwriter/TSDBWriterErrorCode.java
@@ -13,6 +13,7 @@ import com.alibaba.datax.common.spi.ErrorCode;
public enum TSDBWriterErrorCode implements ErrorCode {
REQUIRED_VALUE("TSDBWriter-00", "Missing the necessary value"),
+ ILLEGAL_VALUE("TSDBWriter-01", "Illegal value"),
RUNTIME_EXCEPTION("TSDBWriter-01", "Runtime exception"),
RETRY_WRITER_EXCEPTION("TSDBWriter-02", "After repeated attempts, the write still fails");
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/HttpUtils.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/HttpUtils.java
index b81512f7..29b14dab 100644
--- a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/HttpUtils.java
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/HttpUtils.java
@@ -1,11 +1,14 @@
package com.alibaba.datax.plugin.writer.util;
import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
import org.apache.http.client.fluent.Content;
import org.apache.http.client.fluent.Request;
import org.apache.http.entity.ContentType;
import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.util.Base64;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -19,43 +22,44 @@ import java.util.concurrent.TimeUnit;
*/
public final class HttpUtils {
- public final static Charset UTF_8 = Charset.forName("UTF-8");
public final static int CONNECT_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
public final static int SOCKET_TIMEOUT_DEFAULT_IN_MILL = (int) TimeUnit.SECONDS.toMillis(60);
+ private static final String CREDENTIALS_FORMAT = "%s:%s";
+ private static final String BASIC_AUTHENTICATION_FORMAT = "Basic %s";
+
private HttpUtils() {
}
- public static String get(String url) throws Exception {
- Content content = Request.Get(url)
+ public static String get(String url, String username, String password) throws Exception {
+ final Request request = Request.Get(url)
.connectTimeout(CONNECT_TIMEOUT_DEFAULT_IN_MILL)
- .socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL)
+ .socketTimeout(SOCKET_TIMEOUT_DEFAULT_IN_MILL);
+ addAuth(request, username, password);
+ Content content = request
.execute()
.returnContent();
if (content == null) {
return null;
}
- return content.asString(UTF_8);
+ return content.asString(StandardCharsets.UTF_8);
}
- public static String post(String url, Map params) throws Exception {
- return post(url, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
+ public static String post(String url, String username, String password, Map params) throws Exception {
+ return post(url, username, password, JSON.toJSONString(params), CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
}
- public static String post(String url, String params) throws Exception {
- return post(url, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
+ public static String post(String url, String username, String password, String params) throws Exception {
+ return post(url, username, password, params, CONNECT_TIMEOUT_DEFAULT_IN_MILL, SOCKET_TIMEOUT_DEFAULT_IN_MILL);
}
- public static String post(String url, Map params,
+ public static String post(String url, String username, String password, String params,
int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
- return post(url, JSON.toJSONString(params), connectTimeoutInMill, socketTimeoutInMill);
- }
-
- public static String post(String url, String params,
- int connectTimeoutInMill, int socketTimeoutInMill) throws Exception {
- Content content = Request.Post(url)
+ Request request = Request.Post(url)
.connectTimeout(connectTimeoutInMill)
- .socketTimeout(socketTimeoutInMill)
+ .socketTimeout(socketTimeoutInMill);
+ addAuth(request, username, password);
+ Content content = request
.addHeader("Content-Type", "application/json")
.bodyString(params, ContentType.APPLICATION_JSON)
.execute()
@@ -63,6 +67,22 @@ public final class HttpUtils {
if (content == null) {
return null;
}
- return content.asString(UTF_8);
+ return content.asString(StandardCharsets.UTF_8);
+ }
+
+ private static void addAuth(Request request, String username, String password) {
+ String authorization = generateHttpAuthorization(username, password);
+ if (authorization != null) {
+ request.setHeader("Authorization", authorization);
+ }
+ }
+
+ private static String generateHttpAuthorization(String username, String password) {
+ if (StringUtils.isBlank(username) || StringUtils.isBlank(password)) {
+ return null;
+ }
+ String credentials = String.format(CREDENTIALS_FORMAT, username, password);
+ credentials = Base64.getEncoder().encodeToString(credentials.getBytes());
+ return String.format(BASIC_AUTHENTICATION_FORMAT, credentials);
}
}
diff --git a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/TSDBUtils.java b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/TSDBUtils.java
index ed01d877..d57c5935 100644
--- a/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/TSDBUtils.java
+++ b/tsdbwriter/src/main/java/com/alibaba/datax/plugin/writer/util/TSDBUtils.java
@@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.util;
import com.alibaba.datax.plugin.writer.conn.DataPoint4TSDB;
import com.alibaba.fastjson.JSON;
+import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -22,45 +23,56 @@ public final class TSDBUtils {
private TSDBUtils() {
}
- public static String version(String address) {
+ public static String version(String address, String username, String password) {
String url = String.format("%s/api/version", address);
String rsp;
try {
- rsp = HttpUtils.get(url);
+ rsp = HttpUtils.get(url, username, password);
} catch (Exception e) {
throw new RuntimeException(e);
}
return rsp;
}
- public static String config(String address) {
+ public static String config(String address, String username, String password) {
String url = String.format("%s/api/config", address);
String rsp;
try {
- rsp = HttpUtils.get(url);
+ rsp = HttpUtils.get(url, username, password);
} catch (Exception e) {
throw new RuntimeException(e);
}
return rsp;
}
- public static boolean put(String address, List dps) {
- return put(address, JSON.toJSON(dps));
+ public static boolean put(String address, String database, String username, String password, List dps) {
+ return put(address, database, username, password, JSON.toJSON(dps));
}
- public static boolean put(String address, DataPoint4TSDB dp) {
- return put(address, JSON.toJSON(dp));
+ public static boolean put(String address, String database, String username, String password, DataPoint4TSDB dp) {
+ return put(address, database, username, password, JSON.toJSON(dp));
}
- private static boolean put(String address, Object o) {
- return put(address, o.toString());
+ private static boolean put(String address, String database, String username, String password, Object o) {
+ return put(address, database, username, password, o.toString());
}
- public static boolean put(String address, String s) {
- String url = String.format("%s/api/put", address);
+ public static boolean put(String address, String database, String username, String password, String s) {
+ return put(address, database, username, password, s, false);
+ }
+
+ public static boolean mput(String address, String database, String username, String password, String s) {
+ return put(address, database, username, password, s, true);
+ }
+
+ public static boolean put(String address, String database, String username, String password, String s, boolean multiField) {
+ String url = address + (multiField ? "/api/mput" : "/api/put");
+ if (!StringUtils.isBlank(database)) {
+ url = url.concat("?db=" + database);
+ }
String rsp;
try {
- rsp = HttpUtils.post(url, s);
+ rsp = HttpUtils.post(url, username, password, s);
// If successful, the returned content should be null.
assert rsp == null;
} catch (Exception e) {
diff --git a/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/conn/TSDBConnectionTest.java b/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/conn/TSDBConnectionTest.java
index 455f4ce6..fada706e 100644
--- a/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/conn/TSDBConnectionTest.java
+++ b/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/conn/TSDBConnectionTest.java
@@ -19,12 +19,12 @@ public class TSDBConnectionTest {
@Test
public void testVersion() {
- String version = new TSDBConnection(TSDB_ADDRESS).version();
+ String version = new TSDBConnection(TSDB_ADDRESS,null,null,null).version();
Assert.assertNotNull(version);
}
@Test
public void testIsSupported() {
- Assert.assertTrue(new TSDBConnection(TSDB_ADDRESS).isSupported());
+ Assert.assertTrue(new TSDBConnection(TSDB_ADDRESS,null,null,null).isSupported());
}
}
diff --git a/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/HttpUtilsTest.java b/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/HttpUtilsTest.java
index 69f26b80..1f8fb870 100644
--- a/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/HttpUtilsTest.java
+++ b/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/HttpUtilsTest.java
@@ -24,7 +24,7 @@ public class HttpUtilsTest {
Map params = new HashMap();
params.put("foo", "bar");
- String rsp = HttpUtils.post(url, params);
+ String rsp = HttpUtils.post(url, null,null,params);
System.out.println(rsp);
Assert.assertNotNull(rsp);
}
@@ -32,7 +32,7 @@ public class HttpUtilsTest {
@Test
public void testGet() throws Exception {
String url = String.format("%s/api/version", Const.OPENTSDB_ADDRESS);
- String rsp = HttpUtils.get(url);
+ String rsp = HttpUtils.get(url,null,null);
System.out.println(rsp);
Assert.assertNotNull(rsp);
}
diff --git a/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/TSDBTest.java b/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/TSDBTest.java
index 7d22bb72..8debf406 100644
--- a/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/TSDBTest.java
+++ b/tsdbwriter/src/test/java/com/alibaba/datax/plugin/writer/util/TSDBTest.java
@@ -17,11 +17,11 @@ public class TSDBTest {
@Test
public void testVersion() {
- String version = TSDBUtils.version(Const.TSDB_ADDRESS);
+ String version = TSDBUtils.version(Const.TSDB_ADDRESS,null,null);
Assert.assertNotNull(version);
System.out.println(version);
- version = TSDBUtils.version(Const.OPENTSDB_ADDRESS);
+ version = TSDBUtils.version(Const.OPENTSDB_ADDRESS,null,null);
Assert.assertNotNull(version);
System.out.println(version);
}