From db0333b9712ad98088a17d3ab4501d615d338699 Mon Sep 17 00:00:00 2001 From: "bake.snn" Date: Mon, 11 Mar 2019 11:19:41 +0800 Subject: [PATCH] =?UTF-8?q?=E6=94=AF=E6=8C=81Phoenix5.x=E7=89=88=E6=9C=AC?= =?UTF-8?q?=E8=AF=BB=E5=86=99=E6=8F=92=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hbase20xsqlreader/doc/hbase20xsqlreader.md | 164 +++++++ hbase20xsqlreader/pom.xml | 116 +++++ .../src/main/assembly/package.xml | 35 ++ .../reader/hbase20xsqlreader/Constant.java | 28 ++ .../HBase20SQLReaderHelper.java | 403 ++++++++++++++++++ .../hbase20xsqlreader/HBase20xSQLReader.java | 53 +++ .../HBase20xSQLReaderErrorCode.java | 39 ++ .../HBase20xSQLReaderTask.java | 121 ++++++ .../plugin/reader/hbase20xsqlreader/Key.java | 40 ++ .../src/main/resources/plugin.json | 7 + .../main/resources/plugin_job_template.json | 13 + hbase20xsqlwriter/doc/hbase20xsqlwriter.md | 164 +++++++ hbase20xsqlwriter/pom.xml | 107 +++++ .../src/main/assembly/package.xml | 35 ++ .../writer/hbase20xsqlwriter/Constant.java | 17 + .../hbase20xsqlwriter/HBase20xSQLHelper.java | 142 ++++++ .../hbase20xsqlwriter/HBase20xSQLWriter.java | 58 +++ .../HBase20xSQLWriterErrorCode.java | 37 ++ .../HBase20xSQLWriterTask.java | 389 +++++++++++++++++ .../plugin/writer/hbase20xsqlwriter/Key.java | 36 ++ .../hbase20xsqlwriter/NullModeType.java | 32 ++ .../src/main/resources/plugin.json | 7 + .../main/resources/plugin_job_template.json | 13 + package.xml | 14 + pom.xml | 2 + 25 files changed, 2072 insertions(+) create mode 100644 hbase20xsqlreader/doc/hbase20xsqlreader.md create mode 100644 hbase20xsqlreader/pom.xml create mode 100644 hbase20xsqlreader/src/main/assembly/package.xml create mode 100644 hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/Constant.java create mode 100644 hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20SQLReaderHelper.java create mode 100644 hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20xSQLReader.java create mode 100644 hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20xSQLReaderErrorCode.java create mode 100644 hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20xSQLReaderTask.java create mode 100644 hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/Key.java create mode 100644 hbase20xsqlreader/src/main/resources/plugin.json create mode 100644 hbase20xsqlreader/src/main/resources/plugin_job_template.json create mode 100644 hbase20xsqlwriter/doc/hbase20xsqlwriter.md create mode 100644 hbase20xsqlwriter/pom.xml create mode 100755 hbase20xsqlwriter/src/main/assembly/package.xml create mode 100755 hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/Constant.java create mode 100644 hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLHelper.java create mode 100644 hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriter.java create mode 100644 hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriterErrorCode.java create mode 100644 hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriterTask.java create mode 100644 hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/Key.java create mode 100644 hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/NullModeType.java create mode 100755 hbase20xsqlwriter/src/main/resources/plugin.json create mode 100644 hbase20xsqlwriter/src/main/resources/plugin_job_template.json diff --git a/hbase20xsqlreader/doc/hbase20xsqlreader.md b/hbase20xsqlreader/doc/hbase20xsqlreader.md new file mode 100644 index 00000000..9df020cc --- /dev/null +++ b/hbase20xsqlreader/doc/hbase20xsqlreader.md @@ -0,0 +1,164 @@ +# hbase20xsqlreader 插件文档 + + +___ + + + +## 1 快速介绍 + +hbase20xsqlreader插件实现了从Phoenix(HBase SQL)读取数据,对应版本为HBase2.X和Phoenix5.X。 + +## 2 实现原理 + +简而言之,hbase20xsqlreader通过Phoenix轻客户端去连接Phoenix QueryServer,并根据用户配置信息生成查询SELECT 语句,然后发送到QueryServer读取HBase数据,并将返回结果使用DataX自定义的数据类型拼装为抽象的数据集,最终传递给下游Writer处理。 + +## 3 功能说明 + +### 3.1 配置样例 + +* 配置一个从Phoenix同步抽取数据到本地的作业: + +``` +{ + "job": { + "content": [ + { + "reader": { + "name": "hbase20xsqlreader", //指定插件为hbase20xsqlreader + "parameter": { + "queryServerAddress": "http://127.0.0.1:8765", //填写连接Phoenix QueryServer地址 + "serialization": "PROTOBUF", //QueryServer序列化格式 + "table": "TEST", //读取表名 + "column": ["ID", "NAME"], //所要读取列名 + "splitKey": "ID" //切分列,必须是表主键 + } + }, + "writer": { + "name": "streamwriter", + "parameter": { + "encoding": "UTF-8", + "print": true + } + } + } + ], + "setting": { + "speed": { + "channel": "3" + } + } + } +} +``` + + +### 3.2 参数说明 + +* **queryServerAddress** + + * 描述:hbase20xsqlreader需要通过Phoenix轻客户端去连接Phoenix QueryServer,因此这里需要填写对应QueryServer地址。 + + * 必选:是
+ + * 默认值:无
+ +* **serialization** + + * 描述:QueryServer使用的序列化协议 + + * 必选:否
+ + * 默认值:PROTOBUF
+ +* **table** + + * 描述:所要读取表名 + + * 必选:是
+ + * 默认值:无
+ +* **schema** + + * 描述:表所在的schema + + * 必选:否
+ + * 默认值:无
+ +* **column** + + * 描述:填写需要从phoenix表中读取的列名集合,使用JSON的数组描述字段信息,空值表示读取所有列。 + + * 必选: 否
+ + * 默认值:全部列
+ +* **splitKey** + + * 描述:读取表时对表进行切分并行读取,切分时有两种方式:1.根据该列的最大最小值按照指定channel个数均分,这种方式仅支持整形和字符串类型切分列;2.根据设置的splitPoint进行切分 + + * 必选:是
+ + * 默认值:无
+ +* **splitPoints** + + * 描述:由于根据切分列最大最小值切分时不能保证避免数据热点,splitKey支持用户根据数据特征动态指定切分点,对表数据进行切分。建议切分点根据Region的startkey和endkey设置,保证每个查询对应单个Region + + * 必选: 否
+ + * 默认值:无
+ +* **where** + + * 描述:支持对表查询增加过滤条件,每个切分都会携带该过滤条件。 + + * 必选: 否
+ + * 默认值:无
+ +* **querySql** + + * 描述:支持指定多个查询语句,但查询列类型和数目必须保持一致,用户可根据实际情况手动输入表查询语句或多表联合查询语句,设置该参数后,除queryserverAddress参数必须设置外,其余参数将失去作用或可不设置。 + + * 必选: 否
+ + * 默认值:无
+ + +### 3.3 类型转换 + +目前hbase20xsqlreader支持大部分Phoenix类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。 + +下面列出MysqlReader针对Mysql类型转换列表: + + +| DataX 内部类型| Phoenix 数据类型 | +| -------- | ----- | +| String |CHAR, VARCHAR| +| Bytes |BINARY, VARBINARY| +| Bool |BOOLEAN | +| Long |INTEGER, TINYINT, SMALLINT, BIGINT | +| Double |FLOAT, DECIMAL, DOUBLE, | +| Date |DATE, TIME, TIMESTAMP | + + + +## 4 性能报告 + +略 + +## 5 约束限制 + +* 切分表时切分列仅支持单个列,且该列必须是表主键 +* 不设置splitPoint默认使用自动切分,此时切分列仅支持整形和字符型 +* 表名和SCHEMA名及列名大小写敏感,请与Phoenix表实际大小写保持一致 +* 仅支持通过Phoenix QeuryServer读取数据,因此您的Phoenix必须启动QueryServer服务才能使用本插件 + +## 6 FAQ + +*** + + diff --git a/hbase20xsqlreader/pom.xml b/hbase20xsqlreader/pom.xml new file mode 100644 index 00000000..2df9a1a2 --- /dev/null +++ b/hbase20xsqlreader/pom.xml @@ -0,0 +1,116 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + hbase20xsqlreader + 0.0.1-SNAPSHOT + jar + + + 5.0.0-HBase-2.0 + + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + org.apache.phoenix + phoenix-queryserver + ${phoenix.version} + + + servlet-api + javax.servlet + + + + + + + junit + junit + test + + + org.mockito + mockito-core + 2.0.44-beta + test + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-service-face + + + test + + + com.alibaba.datax + plugin-rdbms-util + 0.0.1-SNAPSHOT + compile + + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + 1.6 + 1.6 + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + diff --git a/hbase20xsqlreader/src/main/assembly/package.xml b/hbase20xsqlreader/src/main/assembly/package.xml new file mode 100644 index 00000000..c6ade25f --- /dev/null +++ b/hbase20xsqlreader/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/hbase20xsqlreader + + + target/ + + hbase20xsqlreader-0.0.1-SNAPSHOT.jar + + plugin/reader/hbase20xsqlreader + + + + + + false + plugin/reader/hbase20xsqlreader/libs + runtime + + + diff --git a/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/Constant.java b/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/Constant.java new file mode 100644 index 00000000..0190125f --- /dev/null +++ b/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/Constant.java @@ -0,0 +1,28 @@ +package com.alibaba.datax.plugin.reader.hbase20xsqlreader; + +public class Constant { + public static final String PK_TYPE = "pkType"; + + public static final Object PK_TYPE_STRING = "pkTypeString"; + + public static final Object PK_TYPE_LONG = "pkTypeLong"; + + public static final String DEFAULT_SERIALIZATION = "PROTOBUF"; + + public static final String CONNECT_STRING_TEMPLATE = "jdbc:phoenix:thin:url=%s;serialization=%s"; + + public static final String CONNECT_DRIVER_STRING = "org.apache.phoenix.queryserver.client.Driver"; + + public static final String SELECT_COLUMNS_TEMPLATE = "SELECT COLUMN_NAME, COLUMN_FAMILY FROM SYSTEM.CATALOG WHERE TABLE_NAME='%s' AND COLUMN_NAME IS NOT NULL"; + + public static String QUERY_SQL_TEMPLATE_WITHOUT_WHERE = "select %s from %s "; + + public static String QUERY_SQL_TEMPLATE = "select %s from %s where (%s)"; + + public static String QUERY_MIN_MAX_TEMPLATE = "SELECT MIN(%s),MAX(%s) FROM %s"; + + public static String QUERY_COLUMN_TYPE_TEMPLATE = "SELECT %s FROM %s LIMIT 1"; + + public static String QUERY_SQL_PER_SPLIT = "querySqlPerSplit"; + +} diff --git a/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20SQLReaderHelper.java b/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20SQLReaderHelper.java new file mode 100644 index 00000000..f2d880af --- /dev/null +++ b/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20SQLReaderHelper.java @@ -0,0 +1,403 @@ +package com.alibaba.datax.plugin.reader.hbase20xsqlreader; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode; +import com.alibaba.datax.plugin.rdbms.util.RdbmsRangeSplitWrap; +import org.apache.commons.lang3.StringUtils; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.commons.lang3.tuple.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigInteger; +import java.sql.*; +import java.util.ArrayList; +import java.util.List; + +public class HBase20SQLReaderHelper { + private static final Logger LOG = LoggerFactory.getLogger(HBase20SQLReaderHelper.class); + + private Configuration configuration; + + private Connection connection; + private List querySql; + private String fullTableName; + private List columnNames; + private String splitKey; + private List splitPoints; + + + public HBase20SQLReaderHelper (Configuration configuration) { + this.configuration = configuration; + } + /** + * 校验配置参数是否正确 + */ + public void validateParameter() { + // queryserver地址必须配置 + String queryServerAddress = configuration.getNecessaryValue(Key.QUERYSERVER_ADDRESS, + HBase20xSQLReaderErrorCode.REQUIRED_VALUE); + String serialization = configuration.getString(Key.SERIALIZATION_NAME, Constant.DEFAULT_SERIALIZATION); + connection = getConnection(queryServerAddress, serialization); + + //判断querySql是否配置,如果配置则table配置可为空,否则table必须配置 + querySql = configuration.getList(Key.QUERY_SQL, String.class); + if (querySql == null || querySql.isEmpty()) { + LOG.info("Split according to splitKey or split points."); + + String schema = configuration.getString(Key.SCHEMA, null); + String tableName = configuration.getNecessaryValue(Key.TABLE, HBase20xSQLReaderErrorCode.REQUIRED_VALUE); + if (schema != null && !schema.isEmpty()) { + fullTableName = schema + "." + tableName; + } else { + fullTableName = tableName; + } + // 如果列名未配置,默认读取全部列* + columnNames = configuration.getList(Key.COLUMN, String.class); + splitKey = configuration.getString(Key.SPLIT_KEY, null); + splitPoints = configuration.getList(Key.SPLIT_POINT); + checkTable(schema, tableName); + dealWhere(); + } else { + // 用户指定querySql,切分不做处理,根据给定sql读取数据即可 + LOG.info("Split according to query sql."); + } + } + + public Connection getConnection(String queryServerAddress, String serialization) { + String url = String.format(Constant.CONNECT_STRING_TEMPLATE, queryServerAddress, serialization); + LOG.debug("Connecting to QueryServer [" + url + "] ..."); + Connection conn; + try { + Class.forName(Constant.CONNECT_DRIVER_STRING); + conn = DriverManager.getConnection(url); + conn.setAutoCommit(false); + } catch (Throwable e) { + throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.GET_QUERYSERVER_CONNECTION_ERROR, + "无法连接QueryServer,配置不正确或服务未启动,请检查配置和服务状态或者联系HBase管理员.", e); + } + LOG.debug("Connected to QueryServer successfully."); + return conn; + } + + /** + * 检查表名、列名和切分列是否存在 + */ + public void checkTable(String schema, String tableName) { + Statement statement = null; + ResultSet resultSet = null; + try { + statement = connection.createStatement(); + String selectSql = String.format(Constant.SELECT_COLUMNS_TEMPLATE, tableName); + + // 处理schema不为空情况 + if (schema == null || schema.isEmpty()) { + selectSql = selectSql + " AND TABLE_SCHEM IS NULL"; + } else { + selectSql = selectSql + " AND TABLE_SCHEM = '" + schema + "'"; + } + resultSet = statement.executeQuery(selectSql); + List primaryColumnNames = new ArrayList(); + List allColumnName = new ArrayList(); + while (resultSet.next()) { + String columnName = resultSet.getString(1); + allColumnName.add(columnName); + // 列族为空表示该列为主键列 + if (resultSet.getString(2) == null) { + primaryColumnNames.add(columnName); + } + } + if (columnNames != null && !columnNames.isEmpty()) { + for (String columnName : columnNames) { + if (!allColumnName.contains(columnName)) { + // 用户配置的列名在元数据中不存在 + throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_VALUE, + "您配置的列" + columnName + "在表" + tableName + "的元数据中不存在,请检查您的配置或者联系HBase管理员."); + } + } + } else { + columnNames = allColumnName; + configuration.set(Key.COLUMN, allColumnName); + } + if (splitKey != null) { + // 切分列必须是主键列,否则会严重影响读取性能 + if (!primaryColumnNames.contains(splitKey)) { + throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_VALUE, + "您配置的切分列" + splitKey + "不是表" + tableName + "的主键,请检查您的配置或者联系HBase管理员."); + } + } + + } catch (SQLException e) { + throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.GET_PHOENIX_TABLE_ERROR, + "获取表" + tableName + "信息失败,请检查您的集群和表状态或者联系HBase管理员.", e); + + } finally { + closeJdbc(null, statement, resultSet); + } + } + + public void closeJdbc(Connection connection, Statement statement, ResultSet resultSet) { + try { + if (resultSet != null) { + resultSet.close(); + } + if (statement != null) { + statement.close(); + } + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + LOG.warn("数据库连接关闭异常.", HBase20xSQLReaderErrorCode.CLOSE_PHOENIX_CONNECTION_ERROR, e); + } + } + + public void dealWhere() { + String where = configuration.getString(Key.WHERE, null); + if(StringUtils.isNotBlank(where)) { + String whereImprove = where.trim(); + if(whereImprove.endsWith(";") || whereImprove.endsWith(";")) { + whereImprove = whereImprove.substring(0,whereImprove.length()-1); + } + configuration.set(Key.WHERE, whereImprove); + } + } + + /** + * 对表进行切分 + */ + public List doSplit(int adviceNumber) { + List pluginParams = new ArrayList(); + List rangeList; + String where = configuration.getString(Key.WHERE); + boolean hasWhere = StringUtils.isNotBlank(where); + if (querySql == null || querySql.isEmpty()) { + // 如果splitPoints为空,则根据splitKey自动切分,不过这种切分方式无法保证数据均分,且只支持整形和字符型列 + if (splitPoints == null || splitPoints.isEmpty()) { + LOG.info("Split accoring min and max value of splitColumn..."); + Pair minMaxPK = getPkRange(configuration); + if (null == minMaxPK) { + throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_SPLIT_PK, + "根据切分主键切分表失败. DataX仅支持切分主键为一个,并且类型为整数或者字符串类型. " + + "请尝试使用其他的切分主键或者联系 HBase管理员 进行处理."); + } + if (null == minMaxPK.getLeft() || null == minMaxPK.getRight()) { + // 切分后获取到的start/end 有 Null 的情况 + pluginParams.add(configuration); + return pluginParams; + } + boolean isStringType = Constant.PK_TYPE_STRING.equals(configuration + .getString(Constant.PK_TYPE)); + boolean isLongType = Constant.PK_TYPE_LONG.equals(configuration + .getString(Constant.PK_TYPE)); + if (isStringType) { + rangeList = RdbmsRangeSplitWrap.splitAndWrap( + String.valueOf(minMaxPK.getLeft()), + String.valueOf(minMaxPK.getRight()), adviceNumber, + splitKey, "'", null); + } else if (isLongType) { + rangeList = RdbmsRangeSplitWrap.splitAndWrap( + new BigInteger(minMaxPK.getLeft().toString()), + new BigInteger(minMaxPK.getRight().toString()), + adviceNumber, splitKey); + } else { + throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_SPLIT_PK, + "您配置的切分主键(splitPk) 类型 DataX 不支持. DataX 仅支持切分主键为一个,并且类型为整数或者字符串类型. " + + "请尝试使用其他的切分主键或者联系HBase管理员进行处理."); + } + + } else { + LOG.info("Split accoring splitPoints..."); + // 根据指定splitPoints进行切分 + rangeList = buildSplitRange(); + } + String tempQuerySql; + if (null != rangeList && !rangeList.isEmpty()) { + for (String range : rangeList) { + Configuration tempConfig = configuration.clone(); + + tempQuerySql = buildQuerySql(columnNames, fullTableName, where) + + (hasWhere ? " and " : " where ") + range; + LOG.info("Query SQL: " + tempQuerySql); + tempConfig.set(Constant.QUERY_SQL_PER_SPLIT, tempQuerySql); + pluginParams.add(tempConfig); + } + } else { + Configuration tempConfig = configuration.clone(); + tempQuerySql = buildQuerySql(columnNames, fullTableName, where) + + (hasWhere ? " and " : " where ") + + String.format(" %s IS NOT NULL", splitKey); + LOG.info("Query SQL: " + tempQuerySql); + tempConfig.set(Constant.QUERY_SQL_PER_SPLIT, tempQuerySql); + pluginParams.add(tempConfig); + } + } else { + // 指定querySql不需要切分 + for (String sql : querySql) { + Configuration tempConfig = configuration.clone(); + tempConfig.set(Constant.QUERY_SQL_PER_SPLIT, sql); + pluginParams.add(tempConfig); + } + } + return pluginParams; + } + + public static String buildQuerySql(List columnNames, String table, + String where) { + String querySql; + StringBuilder columnBuilder = new StringBuilder(); + for (String columnName : columnNames) { + columnBuilder.append(columnName).append(","); + } + columnBuilder.setLength(columnBuilder.length() -1); + if (StringUtils.isBlank(where)) { + querySql = String.format(Constant.QUERY_SQL_TEMPLATE_WITHOUT_WHERE, + columnBuilder.toString(), table); + } else { + querySql = String.format(Constant.QUERY_SQL_TEMPLATE, columnBuilder.toString(), + table, where); + } + return querySql; + } + + private List buildSplitRange() { + String getSplitKeyTypeSQL = String.format(Constant.QUERY_COLUMN_TYPE_TEMPLATE, splitKey, fullTableName); + Statement statement = null; + ResultSet resultSet = null; + List splitConditions = new ArrayList(); + + try { + statement = connection.createStatement(); + resultSet = statement.executeQuery(getSplitKeyTypeSQL); + ResultSetMetaData rsMetaData = resultSet.getMetaData(); + int type = rsMetaData.getColumnType(1); + String symbol = "%s"; + switch (type) { + case Types.CHAR: + case Types.VARCHAR: + symbol = "'%s'"; + break; + case Types.DATE: + symbol = "TO_DATE('%s')"; + break; + case Types.TIME: + symbol = "TO_TIME('%s')"; + break; + case Types.TIMESTAMP: + symbol = "TO_TIMESTAMP('%s')"; + break; + case Types.BINARY: + case Types.VARBINARY: + case Types.ARRAY: + throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_SPLIT_PK, + "切分列类型为" + rsMetaData.getColumnTypeName(1) + ",暂不支持该类型字段作为切分列。"); + } + String splitCondition = null; + for (int i = 0; i <= splitPoints.size(); i++) { + if (i == 0) { + splitCondition = splitKey + " <= " + String.format(symbol, splitPoints.get(i)); + } else if (i == splitPoints.size()) { + splitCondition = splitKey + " > " + String.format(symbol, splitPoints.get(i - 1)); + } else { + splitCondition = splitKey + " > " + String.format(symbol, splitPoints.get(i - 1)) + + " AND " + splitKey + " <= " + String.format(symbol, splitPoints.get(i)); + } + splitConditions.add(splitCondition); + } + + return splitConditions; + } catch (SQLException e) { + throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.GET_TABLE_COLUMNTYPE_ERROR, + "获取切分列类型失败,请检查服务或给定表和切分列是否正常,或者联系HBase管理员进行处理。", e); + } finally { + closeJdbc(null, statement, resultSet); + } + + } + + private Pair getPkRange(Configuration configuration) { + String pkRangeSQL = String.format(Constant.QUERY_MIN_MAX_TEMPLATE, splitKey, splitKey, fullTableName); + String where = configuration.getString(Key.WHERE); + if (StringUtils.isNotBlank(where)) { + pkRangeSQL = String.format("%s WHERE (%s AND %s IS NOT NULL)", + pkRangeSQL, where, splitKey); + } + Statement statement = null; + ResultSet resultSet = null; + Pair minMaxPK = null; + + try { + statement = connection.createStatement(); + resultSet = statement.executeQuery(pkRangeSQL); + ResultSetMetaData rsMetaData = resultSet.getMetaData(); + + if (isPKTypeValid(rsMetaData)) { + if (isStringType(rsMetaData.getColumnType(1))) { + if(configuration != null) { + configuration + .set(Constant.PK_TYPE, Constant.PK_TYPE_STRING); + } + if (resultSet.next()) { + minMaxPK = new ImmutablePair( + resultSet.getString(1), resultSet.getString(2)); + } + } else if (isLongType(rsMetaData.getColumnType(1))) { + if(configuration != null) { + configuration.set(Constant.PK_TYPE, Constant.PK_TYPE_LONG); + } + if (resultSet.next()) { + minMaxPK = new ImmutablePair( + resultSet.getLong(1), resultSet.getLong(2)); + } + } else { + throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_SPLIT_PK, + "您配置的DataX切分主键(splitPk)有误. 因为您配置的切分主键(splitPk) 类型 DataX 不支持. " + + "DataX 仅支持切分主键为一个,并且类型为整数或者字符串类型. 请尝试使用其他的切分主键或者联系HBASE管理员进行处理."); + } + } else { + throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_SPLIT_PK, + "您配置的DataX切分主键(splitPk)有误. 因为您配置的切分主键(splitPk) 类型 DataX 不支持. " + + "DataX 仅支持切分主键为一个,并且类型为整数或者字符串类型. 请尝试使用其他的切分主键或者联系HBASE管理员进行处理."); + } + } catch (SQLException e) { + throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_SPLIT_PK, e); + } finally { + closeJdbc(null, statement, resultSet); + } + + return minMaxPK; + } + + private static boolean isPKTypeValid(ResultSetMetaData rsMetaData) { + boolean ret = false; + try { + int minType = rsMetaData.getColumnType(1); + int maxType = rsMetaData.getColumnType(2); + + boolean isNumberType = isLongType(minType); + + boolean isStringType = isStringType(minType); + + if (minType == maxType && (isNumberType || isStringType)) { + ret = true; + } + } catch (Exception e) { + throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_SPLIT_PK, + "DataX获取切分主键(splitPk)字段类型失败. 该错误通常是系统底层异常导致. 请联系旺旺:askdatax或者DBA处理."); + } + return ret; + } + + private static boolean isLongType(int type) { + boolean isValidLongType = type == Types.BIGINT || type == Types.INTEGER + || type == Types.SMALLINT || type == Types.TINYINT; + return isValidLongType; + } + + private static boolean isStringType(int type) { + return type == Types.CHAR || type == Types.NCHAR + || type == Types.VARCHAR || type == Types.LONGVARCHAR + || type == Types.NVARCHAR; + } +} diff --git a/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20xSQLReader.java b/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20xSQLReader.java new file mode 100644 index 00000000..2072c2c0 --- /dev/null +++ b/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20xSQLReader.java @@ -0,0 +1,53 @@ +package com.alibaba.datax.plugin.reader.hbase20xsqlreader; + +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; + +import java.util.List; + +public class HBase20xSQLReader extends Reader { + + public static class Job extends Reader.Job { + private Configuration originalConfig; + private HBase20SQLReaderHelper readerHelper; + @Override + public void init() { + this.originalConfig = this.getPluginJobConf(); + this.readerHelper = new HBase20SQLReaderHelper(this.originalConfig); + readerHelper.validateParameter(); + } + + @Override + public List split(int adviceNumber) { + return readerHelper.doSplit(adviceNumber); + } + + @Override + public void destroy() { + // do nothing + } + } + + public static class Task extends Reader.Task { + private Configuration readerConfig; + private HBase20xSQLReaderTask hbase20xSQLReaderTask; + + @Override + public void init() { + this.readerConfig = super.getPluginJobConf(); + hbase20xSQLReaderTask = new HBase20xSQLReaderTask(readerConfig, super.getTaskGroupId(), super.getTaskId()); + } + + @Override + public void startRead(RecordSender recordSender) { + hbase20xSQLReaderTask.readRecord(recordSender); + } + + @Override + public void destroy() { + // do nothing + } + + } +} diff --git a/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20xSQLReaderErrorCode.java b/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20xSQLReaderErrorCode.java new file mode 100644 index 00000000..415bf71f --- /dev/null +++ b/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20xSQLReaderErrorCode.java @@ -0,0 +1,39 @@ +package com.alibaba.datax.plugin.reader.hbase20xsqlreader; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum HBase20xSQLReaderErrorCode implements ErrorCode { + REQUIRED_VALUE("Hbasewriter-00", "您缺失了必须填写的参数值."), + ILLEGAL_VALUE("Hbasewriter-01", "您填写的参数值不合法."), + GET_QUERYSERVER_CONNECTION_ERROR("Hbasewriter-02", "获取QueryServer连接时出错."), + GET_PHOENIX_TABLE_ERROR("Hbasewriter-03", "获取 Phoenix table时出错."), + GET_TABLE_COLUMNTYPE_ERROR("Hbasewriter-05", "获取表列类型时出错."), + CLOSE_PHOENIX_CONNECTION_ERROR("Hbasewriter-06", "关闭JDBC连接时时出错."), + ILLEGAL_SPLIT_PK("Hbasewriter-07", "非法splitKey配置."), + PHOENIX_COLUMN_TYPE_CONVERT_ERROR("Hbasewriter-08", "phoenix的列类型转换错误."), + QUERY_DATA_ERROR("Hbasewriter-09", "truncate hbase表时发生异常."), + ; + + private final String code; + private final String description; + + private HBase20xSQLReaderErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, this.description); + } +} diff --git a/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20xSQLReaderTask.java b/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20xSQLReaderTask.java new file mode 100644 index 00000000..866cef38 --- /dev/null +++ b/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/HBase20xSQLReaderTask.java @@ -0,0 +1,121 @@ +package com.alibaba.datax.plugin.reader.hbase20xsqlreader; + +import com.alibaba.datax.common.element.*; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.statistics.PerfRecord; +import com.alibaba.datax.common.util.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.sql.*; + +public class HBase20xSQLReaderTask { + private static final Logger LOG = LoggerFactory.getLogger(HBase20xSQLReaderTask.class); + + private Configuration readerConfig; + private int taskGroupId = -1; + private int taskId=-1; + + public HBase20xSQLReaderTask(Configuration config, int taskGroupId, int taskId) { + this.readerConfig = config; + this.taskGroupId = taskGroupId; + this.taskId = taskId; + } + + public void readRecord(RecordSender recordSender) { + String querySql = readerConfig.getString(Constant.QUERY_SQL_PER_SPLIT); + LOG.info("Begin to read record by Sql: [{}\n] {}.", querySql); + HBase20SQLReaderHelper helper = new HBase20SQLReaderHelper(readerConfig); + Connection conn = helper.getConnection(readerConfig.getString(Key.QUERYSERVER_ADDRESS), + readerConfig.getString(Key.SERIALIZATION_NAME, Constant.DEFAULT_SERIALIZATION)); + Statement statement = null; + ResultSet resultSet = null; + try { + long rsNextUsedTime = 0; + long lastTime = System.nanoTime(); + statement = conn.createStatement(); + // 统计查询时间 + PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY); + queryPerfRecord.start(); + + resultSet = statement.executeQuery(querySql); + ResultSetMetaData meta = resultSet.getMetaData(); + int columnNum = meta.getColumnCount(); + // 统计的result_Next时间 + PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL); + allResultPerfRecord.start(); + + while (resultSet.next()) { + Record record = recordSender.createRecord(); + rsNextUsedTime += (System.nanoTime() - lastTime); + for (int i = 1; i <= columnNum; i++) { + Column column = this.convertPhoenixValueToDataxColumn(meta.getColumnType(i), resultSet.getObject(i)); + record.addColumn(column); + } + lastTime = System.nanoTime(); + recordSender.sendToWriter(record); + } + allResultPerfRecord.end(rsNextUsedTime); + LOG.info("Finished read record by Sql: [{}\n] {}.", querySql); + } catch (SQLException e) { + throw DataXException.asDataXException( + HBase20xSQLReaderErrorCode.QUERY_DATA_ERROR, "查询Phoenix数据出现异常,请检查服务状态或与HBase管理员联系!", e); + } finally { + helper.closeJdbc(conn, statement, resultSet); + } + + } + + private Column convertPhoenixValueToDataxColumn(int sqlType, Object value) { + Column column; + switch (sqlType) { + case Types.CHAR: + case Types.VARCHAR: + column = new StringColumn((String) value); + break; + case Types.BINARY: + case Types.VARBINARY: + column = new BytesColumn((byte[]) value); + break; + case Types.BOOLEAN: + column = new BoolColumn((Boolean) value); + break; + case Types.INTEGER: + column = new LongColumn((Integer) value); + break; + case Types.TINYINT: + column = new LongColumn(((Byte) value).longValue()); + break; + case Types.SMALLINT: + column = new LongColumn(((Short) value).longValue()); + break; + case Types.BIGINT: + column = new LongColumn((Long) value); + break; + case Types.FLOAT: + column = new DoubleColumn((Float.valueOf(value.toString()))); + break; + case Types.DECIMAL: + column = new DoubleColumn((BigDecimal)value); + break; + case Types.DOUBLE: + column = new DoubleColumn((Double) value); + break; + case Types.DATE: + column = new DateColumn((Date) value); + break; + case Types.TIME: + column = new DateColumn((Time) value); + break; + case Types.TIMESTAMP: + column = new DateColumn((Timestamp) value); + break; + default: + throw DataXException.asDataXException( + HBase20xSQLReaderErrorCode.PHOENIX_COLUMN_TYPE_CONVERT_ERROR, "遇到不可识别的phoenix类型," + "sqlType :" + sqlType); + } + return column; + } +} diff --git a/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/Key.java b/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/Key.java new file mode 100644 index 00000000..43d811ac --- /dev/null +++ b/hbase20xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase20xsqlreader/Key.java @@ -0,0 +1,40 @@ +package com.alibaba.datax.plugin.reader.hbase20xsqlreader; + +public class Key { + /** + * 【必选】writer要读取的表的表名 + */ + public final static String TABLE = "table"; + /** + * 【必选】writer要读取哪些列 + */ + public final static String COLUMN = "column"; + /** + * 【必选】Phoenix QueryServer服务地址 + */ + public final static String QUERYSERVER_ADDRESS = "queryServerAddress"; + /** + * 【可选】序列化格式,默认为PROTOBUF + */ + public static final String SERIALIZATION_NAME = "serialization"; + /** + * 【可选】Phoenix表所属schema,默认为空 + */ + public static final String SCHEMA = "schema"; + /** + * 【可选】读取数据时切分列 + */ + public static final String SPLIT_KEY = "splitKey"; + /** + * 【可选】读取数据时切分点 + */ + public static final String SPLIT_POINT = "splitPoint"; + /** + * 【可选】读取数据过滤条件配置 + */ + public static final String WHERE = "where"; + /** + * 【可选】查询语句配置 + */ + public static final String QUERY_SQL = "querySql"; +} diff --git a/hbase20xsqlreader/src/main/resources/plugin.json b/hbase20xsqlreader/src/main/resources/plugin.json new file mode 100644 index 00000000..45856411 --- /dev/null +++ b/hbase20xsqlreader/src/main/resources/plugin.json @@ -0,0 +1,7 @@ +{ + "name": "hbase20xsqlreader", + "class": "com.alibaba.datax.plugin.reader.hbase20xsqlreader.HBase20xSQLReader", + "description": "useScene: prod. mechanism: read data from phoenix through queryserver.", + "developer": "bake" +} + diff --git a/hbase20xsqlreader/src/main/resources/plugin_job_template.json b/hbase20xsqlreader/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..256c30cc --- /dev/null +++ b/hbase20xsqlreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,13 @@ +{ + "name": "hbase20xsqlreader", + "parameter": { + "queryserverAddress": "", + "serialization": "PROTOBUF", + "schema": "", + "table": "TABLE1", + "column": ["ID", "NAME"], + "splitKey": "rowkey", + "splitPoint":[], + "where": "" + } +} diff --git a/hbase20xsqlwriter/doc/hbase20xsqlwriter.md b/hbase20xsqlwriter/doc/hbase20xsqlwriter.md new file mode 100644 index 00000000..63e4a431 --- /dev/null +++ b/hbase20xsqlwriter/doc/hbase20xsqlwriter.md @@ -0,0 +1,164 @@ +# HBase20xsqlwriter插件文档 + +## 1. 快速介绍 + +HBase20xsqlwriter实现了向hbase中的SQL表(phoenix)批量导入数据的功能。Phoenix因为对rowkey做了数据编码,所以,直接使用HBaseAPI进行写入会面临手工数据转换的问题,麻烦且易错。本插件提供了SQL方式直接向Phoenix表写入数据。 + +在底层实现上,通过Phoenix QueryServer的轻客户端驱动,执行UPSERT语句向Phoenix写入数据。 + +### 1.1 支持的功能 + +* 支持带索引的表的数据导入,可以同步更新所有的索引表 + + +### 1.2 限制 + +* 要求版本为Phoenix5.x及HBase2.x +* 仅支持通过Phoenix QeuryServer导入数据,因此您Phoenix必须启动QueryServer服务才能使用本插件 +* 不支持清空已有表数据 +* 仅支持通过phoenix创建的表,不支持原生HBase表 +* 不支持带时间戳的数据导入 + +## 2. 实现原理 + +通过Phoenix轻客户端,连接Phoenix QueryServer服务,执行UPSERT语句向表中批量写入数据。因为使用上层接口,所以,可以同步更新索引表。 + +## 3. 配置说明 + +### 3.1 配置样例 + +```json +{ + "job": { + "entry": { + "jvm": "-Xms2048m -Xmx2048m" + }, + "content": [ + { + "reader": { + "name": "txtfilereader", + "parameter": { + "path": "/Users/shf/workplace/datax_test/hbase20xsqlwriter/txt/normal.txt", + "charset": "UTF-8", + "column": [ + { + "index": 0, + "type": "String" + }, + { + "index": 1, + "type": "string" + }, + { + "index": 2, + "type": "string" + }, + { + "index": 3, + "type": "string" + } + ], + "fieldDelimiter": "," + } + }, + "writer": { + "name": "hbase20xsqlwriter", + "parameter": { + "batchSize": "100", + "column": [ + "UID", + "TS", + "EVENTID", + "CONTENT" + ], + "queryServerAddress": "http://127.0.0.1:8765", + "nullMode": "skip", + "table": "目标hbase表名,大小写有关" + } + } + } + ], + "setting": { + "speed": { + "channel": 5 + } + } + } +} +``` + + +### 3.2 参数说明 + +* **name** + + * 描述:插件名字,必须是`hbase11xsqlwriter` + * 必选:是 + * 默认值:无 + +* **table** + + * 描述:要导入的表名,大小写敏感,通常phoenix表都是**大写**表名 + * 必选:是 + * 默认值:无 + +* **column** + + * 描述:列名,大小写敏感,通常phoenix的列名都是**大写**。 + * 需要注意列的顺序,必须与reader输出的列的顺序一一对应。 + * 不需要填写数据类型,会自动从phoenix获取列的元数据 + * 必选:是 + * 默认值:无 + +* **queryServerAddress** + + * 描述:Phoenix QueryServer地址,为必填项,格式:http://${hostName}:${ip},如http://172.16.34.58:8765 + * 必选:是 + * 默认值:无 + +* **serialization** + + * 描述:QueryServer使用的序列化协议 + * 必选:否 + * 默认值:PROTOBUF + +* **batchSize** + + * 描述:批量写入的最大行数 + * 必选:否 + * 默认值:256 + +* **nullMode** + + * 描述:读取到的列值为null时,如何处理。目前有两种方式: + * skip:跳过这一列,即不插入这一列(如果该行的这一列之前已经存在,则会被删除) + * empty:插入空值,值类型的空值是0,varchar的空值是空字符串 + * 必选:否 + * 默认值:skip + +## 4. 性能报告 + +无 + +## 5. 约束限制 + +writer中的列的定义顺序必须与reader的列顺序匹配。reader中的列顺序定义了输出的每一行中,列的组织顺序。而writer的列顺序,定义的是在收到的数据中,writer期待的列的顺序。例如: + +reader的列顺序是: c1, c2, c3, c4 + +writer的列顺序是: x1, x2, x3, x4 + +则reader输出的列c1就会赋值给writer的列x1。如果writer的列顺序是x1, x2, x4, x3,则c3会赋值给x4,c4会赋值给x3. + + +## 6. FAQ + +1. 并发开多少合适?速度慢时增加并发有用吗? + 数据导入进程默认JVM的堆大小是2GB,并发(channel数)是通过多线程实现的,开过多的线程有时并不能提高导入速度,反而可能因为过于频繁的GC导致性能下降。一般建议并发数(channel)为5-10. + +2. batchSize设置多少比较合适? +默认是256,但应根据每行的大小来计算最合适的batchSize。通常一次操作的数据量在2MB-4MB左右,用这个值除以行大小,即可得到batchSize。 + + + + diff --git a/hbase20xsqlwriter/pom.xml b/hbase20xsqlwriter/pom.xml new file mode 100644 index 00000000..2dc5f4c7 --- /dev/null +++ b/hbase20xsqlwriter/pom.xml @@ -0,0 +1,107 @@ + + + + datax-all + com.alibaba.datax + 0.0.1-SNAPSHOT + + 4.0.0 + + hbase20xsqlwriter + 0.0.1-SNAPSHOT + jar + + + 5.0.0-HBase-2.0 + 1.12.0 + 1.8 + + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + org.apache.phoenix + phoenix-queryserver + ${phoenix.version} + + + + + junit + junit + test + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-service-face + + + test + + + org.mockito + mockito-all + 1.9.5 + test + + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + 1.6 + 1.6 + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + + \ No newline at end of file diff --git a/hbase20xsqlwriter/src/main/assembly/package.xml b/hbase20xsqlwriter/src/main/assembly/package.xml new file mode 100755 index 00000000..f2f7f679 --- /dev/null +++ b/hbase20xsqlwriter/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/writer/hbase20xsqlwriter + + + target/ + + hbase20xsqlwriter-0.0.1-SNAPSHOT.jar + + plugin/writer/hbase20xsqlwriter + + + + + + false + plugin/writer/hbase20xsqlwriter/libs + runtime + + + diff --git a/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/Constant.java b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/Constant.java new file mode 100755 index 00000000..31760705 --- /dev/null +++ b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/Constant.java @@ -0,0 +1,17 @@ +package com.alibaba.datax.plugin.writer.hbase20xsqlwriter; + +public final class Constant { + public static final String DEFAULT_NULL_MODE = "skip"; + public static final String DEFAULT_SERIALIZATION = "PROTOBUF"; + public static final int DEFAULT_BATCH_ROW_COUNT = 256; // 默认一次写256行 + + public static final int TYPE_UNSIGNED_TINYINT = 11; + public static final int TYPE_UNSIGNED_SMALLINT = 13; + public static final int TYPE_UNSIGNED_INTEGER = 9; + public static final int TYPE_UNSIGNED_LONG = 10; + public static final int TYPE_UNSIGNED_FLOAT = 14; + public static final int TYPE_UNSIGNED_DOUBLE = 15; + public static final int TYPE_UNSIGNED_DATE = 19; + public static final int TYPE_UNSIGNED_TIME = 18; + public static final int TYPE_UNSIGNED_TIMESTAMP = 20; +} diff --git a/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLHelper.java b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLHelper.java new file mode 100644 index 00000000..f90b792b --- /dev/null +++ b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLHelper.java @@ -0,0 +1,142 @@ +package com.alibaba.datax.plugin.writer.hbase20xsqlwriter; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.*; +import java.util.ArrayList; +import java.util.List; + +public class HBase20xSQLHelper { + private static final Logger LOG = LoggerFactory.getLogger(HBase20xSQLHelper.class); + + /** + * phoenix瘦客户端连接前缀 + */ + public static final String CONNECT_STRING_PREFIX = "jdbc:phoenix:thin:"; + /** + * phoenix驱动名 + */ + public static final String CONNECT_DRIVER_STRING = "org.apache.phoenix.queryserver.client.Driver"; + /** + * 从系统表查找配置表信息 + */ + public static final String SELECT_CATALOG_TABLE_STRING = "SELECT COLUMN_NAME FROM SYSTEM.CATALOG WHERE TABLE_NAME='%s' AND COLUMN_NAME IS NOT NULL"; + + /** + * 验证配置参数是否正确 + */ + public static void validateParameter(com.alibaba.datax.common.util.Configuration originalConfig) { + // 表名和queryserver地址必须配置,否则抛异常 + String tableName = originalConfig.getNecessaryValue(Key.TABLE, HBase20xSQLWriterErrorCode.REQUIRED_VALUE); + String queryServerAddress = originalConfig.getNecessaryValue(Key.QUERYSERVER_ADDRESS, HBase20xSQLWriterErrorCode.REQUIRED_VALUE); + + // 序列化格式,可不配置,默认PROTOBUF + String serialization = originalConfig.getString(Key.SERIALIZATION_NAME, Constant.DEFAULT_SERIALIZATION); + + String connStr = getConnectionUrl(queryServerAddress, serialization); + // 校验jdbc连接是否正常 + Connection conn = getThinClientConnection(connStr); + + List columnNames = originalConfig.getList(Key.COLUMN, String.class); + if (columnNames == null || columnNames.isEmpty()) { + throw DataXException.asDataXException( + HBase20xSQLWriterErrorCode.ILLEGAL_VALUE, "HBase的columns配置不能为空,请添加目标表的列名配置."); + } + String schema = originalConfig.getString(Key.SCHEMA); + // 检查表以及配置列是否存在 + checkTable(conn, schema, tableName, columnNames); + } + + /** + * 获取JDBC连接,轻量级连接,使用完后必须显式close + */ + public static Connection getThinClientConnection(String connStr) { + LOG.debug("Connecting to QueryServer [" + connStr + "] ..."); + Connection conn; + try { + Class.forName(CONNECT_DRIVER_STRING); + conn = DriverManager.getConnection(connStr); + conn.setAutoCommit(false); + } catch (Throwable e) { + throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.GET_QUERYSERVER_CONNECTION_ERROR, + "无法连接QueryServer,配置不正确或服务未启动,请检查配置和服务状态或者联系HBase管理员.", e); + } + LOG.debug("Connected to QueryServer successfully."); + return conn; + } + + public static Connection getJdbcConnection(Configuration conf) { + String queryServerAddress = conf.getNecessaryValue(Key.QUERYSERVER_ADDRESS, HBase20xSQLWriterErrorCode.REQUIRED_VALUE); + // 序列化格式,可不配置,默认PROTOBUF + String serialization = conf.getString(Key.SERIALIZATION_NAME, "PROTOBUF"); + String connStr = getConnectionUrl(queryServerAddress, serialization); + return getThinClientConnection(connStr); + } + + + public static String getConnectionUrl(String queryServerAddress, String serialization) { + String urlFmt = CONNECT_STRING_PREFIX + "url=%s;serialization=%s"; + return String.format(urlFmt, queryServerAddress, serialization); + } + + public static void checkTable(Connection conn, String schema, String tableName, List columnNames) throws DataXException { + String selectSystemTable = getSelectSystemSQL(schema, tableName); + Statement st = null; + ResultSet rs = null; + try { + st = conn.createStatement(); + rs = st.executeQuery(selectSystemTable); + List allColumns = new ArrayList(); + if (rs.next()) { + allColumns.add(rs.getString(1)); + } else { + LOG.error(tableName + "表不存在,请检查表名是否正确或是否已创建.", HBase20xSQLWriterErrorCode.GET_HBASE_TABLE_ERROR); + throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.GET_HBASE_TABLE_ERROR, + tableName + "表不存在,请检查表名是否正确或是否已创建."); + } + while (rs.next()) { + allColumns.add(rs.getString(1)); + } + for (String columnName : columnNames) { + if (!allColumns.contains(columnName)) { + // 用户配置的列名在元数据中不存在 + throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.ILLEGAL_VALUE, + "您配置的列" + columnName + "在目的表" + tableName + "的元数据中不存在,请检查您的配置或者联系HBase管理员."); + } + } + + } catch (SQLException t) { + throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.GET_HBASE_TABLE_ERROR, + "获取表" + tableName + "信息失败,请检查您的集群和表状态或者联系HBase管理员.", t); + } finally { + closeJdbc(conn, st, rs); + } + } + + private static String getSelectSystemSQL(String schema, String tableName) { + String sql = String.format(SELECT_CATALOG_TABLE_STRING, tableName); + if (schema != null) { + sql = sql + " AND TABLE_SCHEM = '" + schema + "'"; + } + return sql; + } + + public static void closeJdbc(Connection connection, Statement statement, ResultSet resultSet) { + try { + if (resultSet != null) { + resultSet.close(); + } + if (statement != null) { + statement.close(); + } + if (connection != null) { + connection.close(); + } + } catch (SQLException e) { + LOG.warn("数据库连接关闭异常.", HBase20xSQLWriterErrorCode.CLOSE_HBASE_CONNECTION_ERROR); + } + } +} diff --git a/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriter.java b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriter.java new file mode 100644 index 00000000..2c08b734 --- /dev/null +++ b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriter.java @@ -0,0 +1,58 @@ +package com.alibaba.datax.plugin.writer.hbase20xsqlwriter; + +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.spi.Writer; +import com.alibaba.datax.common.util.Configuration; + +import java.util.ArrayList; +import java.util.List; + +public class HBase20xSQLWriter extends Writer { + + public static class Job extends Writer.Job { + + private Configuration config = null; + + @Override + public void init() { + this.config = this.getPluginJobConf(); + HBase20xSQLHelper.validateParameter(this.config); + } + + @Override + public List split(int mandatoryNumber) { + List splitResultConfigs = new ArrayList(); + for (int j = 0; j < mandatoryNumber; j++) { + splitResultConfigs.add(config.clone()); + } + return splitResultConfigs; + } + + @Override + public void destroy() { + //doNothing + } + } + + public static class Task extends Writer.Task { + private Configuration taskConfig; + private HBase20xSQLWriterTask writerTask; + + @Override + public void init() { + this.taskConfig = super.getPluginJobConf(); + this.writerTask = new HBase20xSQLWriterTask(this.taskConfig); + } + + @Override + public void startWrite(RecordReceiver lineReceiver) { + this.writerTask.startWriter(lineReceiver, super.getTaskPluginCollector()); + } + + + @Override + public void destroy() { + // 不需要close + } + } +} \ No newline at end of file diff --git a/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriterErrorCode.java b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriterErrorCode.java new file mode 100644 index 00000000..f946e20b --- /dev/null +++ b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriterErrorCode.java @@ -0,0 +1,37 @@ +package com.alibaba.datax.plugin.writer.hbase20xsqlwriter; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum HBase20xSQLWriterErrorCode implements ErrorCode { + REQUIRED_VALUE("Hbasewriter-00", "您缺失了必须填写的参数值."), + ILLEGAL_VALUE("Hbasewriter-01", "您填写的参数值不合法."), + GET_QUERYSERVER_CONNECTION_ERROR("Hbasewriter-02", "获取QueryServer连接时出错."), + GET_HBASE_TABLE_ERROR("Hbasewriter-03", "获取 Hbase table时出错."), + CLOSE_HBASE_CONNECTION_ERROR("Hbasewriter-04", "关闭Hbase连接时出错."), + GET_TABLE_COLUMNTYPE_ERROR("Hbasewriter-05", "获取表列类型时出错."), + PUT_HBASE_ERROR("Hbasewriter-07", "写入hbase时发生IO异常."), + ; + + private final String code; + private final String description; + + private HBase20xSQLWriterErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, this.description); + } +} diff --git a/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriterTask.java b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriterTask.java new file mode 100644 index 00000000..5557e674 --- /dev/null +++ b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/HBase20xSQLWriterTask.java @@ -0,0 +1,389 @@ +package com.alibaba.datax.plugin.writer.hbase20xsqlwriter; + +import com.alibaba.datax.common.element.Column; +import com.alibaba.datax.common.element.Record; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.plugin.RecordReceiver; +import com.alibaba.datax.common.plugin.TaskPluginCollector; +import com.alibaba.datax.common.util.Configuration; +import com.google.common.collect.Lists; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.math.BigDecimal; +import java.sql.*; +import java.util.Arrays; +import java.util.List; + +public class HBase20xSQLWriterTask { + private final static Logger LOG = LoggerFactory.getLogger(HBase20xSQLWriterTask.class); + + private Configuration configuration; + private TaskPluginCollector taskPluginCollector; + + private Connection connection = null; + private PreparedStatement pstmt = null; + + // 需要向hbsae写入的列的数量,即用户配置的column参数中列的个数。时间戳不包含在内 + private int numberOfColumnsToWrite; + // 期待从源头表的Record中拿到多少列 + private int numberOfColumnsToRead; + private int[] columnTypes; + private List columns; + private String fullTableName; + + private NullModeType nullModeType; + private int batchSize; + + public HBase20xSQLWriterTask(Configuration configuration) { + // 这里仅解析配置,不访问远端集群,配置的合法性检查在writer的init过程中进行 + this.configuration = configuration; + } + + public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) { + this.taskPluginCollector = taskPluginCollector; + + try { + // 准备阶段 + initialize(); + + // 写入数据 + writeData(lineReceiver); + + } catch (Throwable e) { + throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.PUT_HBASE_ERROR, e); + } finally { + // 关闭jdbc连接 + HBase20xSQLHelper.closeJdbc(connection, pstmt, null); + } + + } + + /** + * 初始化JDBC操作对象及列类型 + * @throws SQLException + */ + private void initialize() throws SQLException { + if (connection == null) { + connection = HBase20xSQLHelper.getJdbcConnection(configuration); + connection.setAutoCommit(false); + } + nullModeType = NullModeType.getByTypeName(configuration.getString(Key.NULLMODE, Constant.DEFAULT_NULL_MODE)); + batchSize = configuration.getInt(Key.BATCHSIZE, Constant.DEFAULT_BATCH_ROW_COUNT); + String schema = configuration.getString(Key.SCHEMA); + String tableName = configuration.getNecessaryValue(Key.TABLE, HBase20xSQLWriterErrorCode.REQUIRED_VALUE); + fullTableName = tableName; + if (schema != null && !schema.isEmpty()) { + fullTableName = schema + "." + tableName; + } + columns = configuration.getList(Key.COLUMN, String.class); + if (pstmt == null) { + // 一个Task的生命周期中只使用一个PreparedStatement对象 + pstmt = createPreparedStatement(); + columnTypes = getColumnSqlType(); + } + } + + /** + * 生成sql模板,并根据模板创建PreparedStatement + */ + private PreparedStatement createPreparedStatement() throws SQLException { + // 生成列名集合,列之间用逗号分隔: col1,col2,col3,... + StringBuilder columnNamesBuilder = new StringBuilder(); + for (String col : columns) { + // 列名使用双引号,则不自动转换为全大写,而是保留用户配置的大小写 + columnNamesBuilder.append("\""); + columnNamesBuilder.append(col); + columnNamesBuilder.append("\""); + columnNamesBuilder.append(","); + } + // 移除末尾多余的逗号 + columnNamesBuilder.setLength(columnNamesBuilder.length() - 1); + String columnNames = columnNamesBuilder.toString(); + numberOfColumnsToWrite = columns.size(); + numberOfColumnsToRead = numberOfColumnsToWrite; // 开始的时候,要读的列数娱要写的列数相等 + + // 生成UPSERT模板 + StringBuilder upsertBuilder = + new StringBuilder("upsert into " + fullTableName + " (" + columnNames + " ) values ("); + for (int i = 0; i < numberOfColumnsToWrite; i++) { + upsertBuilder.append("?,"); + } + upsertBuilder.setLength(upsertBuilder.length() - 1); // 移除末尾多余的逗号 + upsertBuilder.append(")"); + + String sql = upsertBuilder.toString(); + PreparedStatement ps = connection.prepareStatement(sql); + LOG.debug("SQL template generated: " + sql); + return ps; + } + + /** + * 根据列名来从数据库元数据中获取这一列对应的SQL类型 + */ + private int[] getColumnSqlType() throws SQLException { + int[] types = new int[numberOfColumnsToWrite]; + StringBuilder columnNamesBuilder = new StringBuilder(); + for (String columnName : columns) { + columnNamesBuilder.append(columnName).append(","); + } + columnNamesBuilder.setLength(columnNamesBuilder.length() - 1); + // 查询一条数据获取表meta信息 + String selectSql = "SELECT " + columnNamesBuilder + " FROM " + fullTableName + " LIMIT 1"; + Statement statement = null; + try { + statement = connection.createStatement(); + ResultSetMetaData meta = statement.executeQuery(selectSql).getMetaData(); + + for (int i = 0; i < columns.size(); i++) { + String name = columns.get(i); + types[i] = meta.getColumnType(i + 1); + LOG.debug("Column name : " + name + ", sql type = " + types[i] + " " + meta.getColumnTypeName(i + 1)); + } + } catch (SQLException e) { + throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.GET_TABLE_COLUMNTYPE_ERROR, + "获取表" + fullTableName + "列类型失败,请检查配置和服务状态或者联系HBase管理员.", e); + } finally { + HBase20xSQLHelper.closeJdbc(null, statement, null); + } + + return types; + } + + /** + * 从接收器中获取每条记录,写入Phoenix + */ + private void writeData(RecordReceiver lineReceiver) throws SQLException { + List buffer = Lists.newArrayListWithExpectedSize(batchSize); + Record record = null; + while ((record = lineReceiver.getFromReader()) != null) { + // 校验列数量是否符合预期 + if (record.getColumnNumber() != numberOfColumnsToRead) { + throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.ILLEGAL_VALUE, + "数据源给出的列数量[" + record.getColumnNumber() + "]与您配置中的列数量[" + numberOfColumnsToRead + + "]不同, 请检查您的配置 或者 联系 Hbase 管理员."); + } + + buffer.add(record); + if (buffer.size() > batchSize) { + doBatchUpsert(buffer); + buffer.clear(); + } + } + + // 处理剩余的record + if (!buffer.isEmpty()) { + doBatchUpsert(buffer); + buffer.clear(); + } + } + + /** + * 批量提交一组数据,如果失败,则尝试一行行提交,如果仍然失败,抛错给用户 + */ + private void doBatchUpsert(List records) throws SQLException { + try { + // 将所有record提交到connection缓存 + for (Record r : records) { + setupStatement(r); + pstmt.addBatch(); + } + + pstmt.executeBatch(); + // 将缓存的数据提交到phoenix + connection.commit(); + pstmt.clearParameters(); + pstmt.clearBatch(); + + } catch (SQLException e) { + LOG.error("Failed batch committing " + records.size() + " records", e); + + // 批量提交失败,则一行行重试,以确定哪一行出错 + connection.rollback(); + HBase20xSQLHelper.closeJdbc(null, pstmt, null); + connection.setAutoCommit(true); + pstmt = createPreparedStatement(); + doSingleUpsert(records); + } catch (Exception e) { + throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.PUT_HBASE_ERROR, e); + } + } + + /** + * 单行提交,将出错的行记录到脏数据中。由脏数据收集模块判断任务是否继续 + */ + private void doSingleUpsert(List records) throws SQLException { + int rowNumber = 0; + for (Record r : records) { + try { + rowNumber ++; + setupStatement(r); + pstmt.executeUpdate(); + } catch (SQLException e) { + //出错了,记录脏数据 + LOG.error("Failed writing to phoenix, rowNumber: " + rowNumber); + this.taskPluginCollector.collectDirtyRecord(r, e); + } + } + } + + private void setupStatement(Record record) throws SQLException { + for (int i = 0; i < numberOfColumnsToWrite; i++) { + Column col = record.getColumn(i); + int sqlType = columnTypes[i]; + // PreparedStatement中的索引从1开始,所以用i+1 + setupColumn(i + 1, sqlType, col); + } + } + + private void setupColumn(int pos, int sqlType, Column col) throws SQLException { + if (col.getRawData() != null) { + switch (sqlType) { + case Types.CHAR: + case Types.VARCHAR: + pstmt.setString(pos, col.asString()); + break; + + case Types.BINARY: + case Types.VARBINARY: + pstmt.setBytes(pos, col.asBytes()); + break; + + case Types.BOOLEAN: + pstmt.setBoolean(pos, col.asBoolean()); + break; + + case Types.TINYINT: + case Constant.TYPE_UNSIGNED_TINYINT: + pstmt.setByte(pos, col.asLong().byteValue()); + break; + + case Types.SMALLINT: + case Constant.TYPE_UNSIGNED_SMALLINT: + pstmt.setShort(pos, col.asLong().shortValue()); + break; + + case Types.INTEGER: + case Constant.TYPE_UNSIGNED_INTEGER: + pstmt.setInt(pos, col.asLong().intValue()); + break; + + case Types.BIGINT: + case Constant.TYPE_UNSIGNED_LONG: + pstmt.setLong(pos, col.asLong()); + break; + + case Types.FLOAT: + pstmt.setFloat(pos, col.asDouble().floatValue()); + break; + + case Types.DOUBLE: + pstmt.setDouble(pos, col.asDouble()); + break; + + case Types.DECIMAL: + pstmt.setBigDecimal(pos, col.asBigDecimal()); + break; + + case Types.DATE: + case Constant.TYPE_UNSIGNED_DATE: + pstmt.setDate(pos, new Date(col.asDate().getTime())); + break; + + case Types.TIME: + case Constant.TYPE_UNSIGNED_TIME: + pstmt.setTime(pos, new Time(col.asDate().getTime())); + break; + + case Types.TIMESTAMP: + case Constant.TYPE_UNSIGNED_TIMESTAMP: + pstmt.setTimestamp(pos, new Timestamp(col.asDate().getTime())); + break; + + default: + throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.ILLEGAL_VALUE, + "不支持您配置的列类型:" + sqlType + ", 请检查您的配置 或者 联系 Hbase 管理员."); + } + } else { + // 没有值,按空值的配置情况处理 + switch (nullModeType){ + case Skip: + // 跳过空值,则不插入该列, + pstmt.setNull(pos, sqlType); + break; + + case Empty: + // 插入"空值",请注意不同类型的空值不同 + // 另外,对SQL来说,空值本身是有值的,这与直接操作HBASE Native API时的空值完全不同 + pstmt.setObject(pos, getEmptyValue(sqlType)); + break; + + default: + // nullMode的合法性在初始化配置的时候已经校验过,这里一定不会出错 + throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.ILLEGAL_VALUE, + "Hbasewriter 不支持该 nullMode 类型: " + nullModeType + + ", 目前支持的 nullMode 类型是:" + Arrays.asList(NullModeType.values())); + } + } + } + + /** + * 根据类型获取"空值" + * 值类型的空值都是0,bool是false,String是空字符串 + * @param sqlType sql数据类型,定义于{@link Types} + */ + private Object getEmptyValue(int sqlType) { + switch (sqlType) { + case Types.VARCHAR: + return ""; + + case Types.BOOLEAN: + return false; + + case Types.TINYINT: + case Constant.TYPE_UNSIGNED_TINYINT: + return (byte) 0; + + case Types.SMALLINT: + case Constant.TYPE_UNSIGNED_SMALLINT: + return (short) 0; + + case Types.INTEGER: + case Constant.TYPE_UNSIGNED_INTEGER: + return (int) 0; + + case Types.BIGINT: + case Constant.TYPE_UNSIGNED_LONG: + return (long) 0; + + case Types.FLOAT: + return (float) 0.0; + + case Types.DOUBLE: + return (double) 0.0; + + case Types.DECIMAL: + return new BigDecimal(0); + + case Types.DATE: + case Constant.TYPE_UNSIGNED_DATE: + return new Date(0); + + case Types.TIME: + case Constant.TYPE_UNSIGNED_TIME: + return new Time(0); + + case Types.TIMESTAMP: + case Constant.TYPE_UNSIGNED_TIMESTAMP: + return new Timestamp(0); + + case Types.BINARY: + case Types.VARBINARY: + return new byte[0]; + + default: + throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.ILLEGAL_VALUE, + "不支持您配置的列类型:" + sqlType + ", 请检查您的配置 或者 联系 Hbase 管理员."); + } + } +} diff --git a/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/Key.java b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/Key.java new file mode 100644 index 00000000..7e93cca0 --- /dev/null +++ b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/Key.java @@ -0,0 +1,36 @@ +package com.alibaba.datax.plugin.writer.hbase20xsqlwriter; + +public class Key { + + /** + * 【必选】writer要写入的表的表名 + */ + public final static String TABLE = "table"; + /** + * 【必选】writer要写入哪些列 + */ + public final static String COLUMN = "column"; + /** + * 【必选】Phoenix QueryServer服务地址 + */ + public final static String QUERYSERVER_ADDRESS = "queryServerAddress"; + /** + * 【可选】序列化格式,默认为PROTOBUF + */ + public static final String SERIALIZATION_NAME = "serialization"; + + /** + * 【可选】批量写入的最大行数,默认100行 + */ + public static final String BATCHSIZE = "batchSize"; + + /** + * 【可选】遇到空值默认跳过 + */ + public static final String NULLMODE = "nullMode"; + /** + * 【可选】Phoenix表所属schema,默认为空 + */ + public static final String SCHEMA = "schema"; + +} diff --git a/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/NullModeType.java b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/NullModeType.java new file mode 100644 index 00000000..788e6345 --- /dev/null +++ b/hbase20xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase20xsqlwriter/NullModeType.java @@ -0,0 +1,32 @@ +package com.alibaba.datax.plugin.writer.hbase20xsqlwriter; + +import com.alibaba.datax.common.exception.DataXException; + +import java.util.Arrays; + +public enum NullModeType { + Skip("skip"), + Empty("empty") + ; + + private String mode; + + + NullModeType(String mode) { + this.mode = mode.toLowerCase(); + } + + public String getMode() { + return mode; + } + + public static NullModeType getByTypeName(String modeName) { + for (NullModeType modeType : values()) { + if (modeType.mode.equalsIgnoreCase(modeName)) { + return modeType; + } + } + throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.ILLEGAL_VALUE, + "Hbasewriter 不支持该 nullMode 类型:" + modeName + ", 目前支持的 nullMode 类型是:" + Arrays.asList(values())); + } +} diff --git a/hbase20xsqlwriter/src/main/resources/plugin.json b/hbase20xsqlwriter/src/main/resources/plugin.json new file mode 100755 index 00000000..91b7069f --- /dev/null +++ b/hbase20xsqlwriter/src/main/resources/plugin.json @@ -0,0 +1,7 @@ +{ + "name": "hbase20xsqlwriter", + "class": "com.alibaba.datax.plugin.writer.hbase20xsqlwriter.HBase20xSQLWriter", + "description": "useScene: prod. mechanism: use hbase sql UPSERT to put data, index tables will be updated too.", + "developer": "bake" +} + diff --git a/hbase20xsqlwriter/src/main/resources/plugin_job_template.json b/hbase20xsqlwriter/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..2cf634b8 --- /dev/null +++ b/hbase20xsqlwriter/src/main/resources/plugin_job_template.json @@ -0,0 +1,13 @@ + { + "name": "hbase20xsqlwriter", + "parameter": { + "queryServerAddress": "", + "table": "", + "serialization": "PROTOBUF", + "column": [ + ], + "batchSize": "100", + "nullMode": "skip", + "schema": "" + } +} \ No newline at end of file diff --git a/package.xml b/package.xml index ae07dd08..47e277b8 100755 --- a/package.xml +++ b/package.xml @@ -308,5 +308,19 @@ datax + + hbase20xsqlreader/target/datax/ + + **/*.* + + datax + + + hbase20xsqlwriter/target/datax/ + + **/*.* + + datax + diff --git a/pom.xml b/pom.xml index f8bc7434..f4d197ba 100755 --- a/pom.xml +++ b/pom.xml @@ -89,6 +89,8 @@ plugin-rdbms-util plugin-unstructured-storage-util + hbase20xsqlreader + hbase20xsqlwriter