支持Phoenix5.x版本读写插件

This commit is contained in:
bake.snn 2019-03-11 11:19:41 +08:00
parent d4d1ea6a15
commit db0333b971
25 changed files with 2072 additions and 0 deletions

View File

@ -0,0 +1,164 @@
# hbase20xsqlreader 插件文档
___
## 1 快速介绍
hbase20xsqlreader插件实现了从Phoenix(HBase SQL)读取数据对应版本为HBase2.X和Phoenix5.X。
## 2 实现原理
简而言之hbase20xsqlreader通过Phoenix轻客户端去连接Phoenix QueryServer并根据用户配置信息生成查询SELECT 语句然后发送到QueryServer读取HBase数据并将返回结果使用DataX自定义的数据类型拼装为抽象的数据集最终传递给下游Writer处理。
## 3 功能说明
### 3.1 配置样例
* 配置一个从Phoenix同步抽取数据到本地的作业:
```
{
"job": {
"content": [
{
"reader": {
"name": "hbase20xsqlreader", //指定插件为hbase20xsqlreader
"parameter": {
"queryServerAddress": "http://127.0.0.1:8765", //填写连接Phoenix QueryServer地址
"serialization": "PROTOBUF", //QueryServer序列化格式
"table": "TEST", //读取表名
"column": ["ID", "NAME"], //所要读取列名
"splitKey": "ID" //切分列,必须是表主键
}
},
"writer": {
"name": "streamwriter",
"parameter": {
"encoding": "UTF-8",
"print": true
}
}
}
],
"setting": {
"speed": {
"channel": "3"
}
}
}
}
```
### 3.2 参数说明
* **queryServerAddress**
* 描述hbase20xsqlreader需要通过Phoenix轻客户端去连接Phoenix QueryServer因此这里需要填写对应QueryServer地址。
* 必选:是 <br />
* 默认值:无 <br />
* **serialization**
* 描述QueryServer使用的序列化协议
* 必选:否 <br />
* 默认值PROTOBUF <br />
* **table**
* 描述:所要读取表名
* 必选:是 <br />
* 默认值:无 <br />
* **schema**
* 描述表所在的schema
* 必选:否 <br />
* 默认值:无 <br />
* **column**
* 描述填写需要从phoenix表中读取的列名集合使用JSON的数组描述字段信息空值表示读取所有列。
* 必选: 否<br />
* 默认值:全部列 <br />
* **splitKey**
* 描述读取表时对表进行切分并行读取切分时有两种方式1.根据该列的最大最小值按照指定channel个数均分这种方式仅支持整形和字符串类型切分列2.根据设置的splitPoint进行切分
* 必选:是 <br />
* 默认值:无 <br />
* **splitPoints**
* 描述由于根据切分列最大最小值切分时不能保证避免数据热点splitKey支持用户根据数据特征动态指定切分点对表数据进行切分。建议切分点根据Region的startkey和endkey设置保证每个查询对应单个Region
* 必选: 否<br />
* 默认值:无 <br />
* **where**
* 描述:支持对表查询增加过滤条件,每个切分都会携带该过滤条件。
* 必选: 否<br />
* 默认值:无<br />
* **querySql**
* 描述支持指定多个查询语句但查询列类型和数目必须保持一致用户可根据实际情况手动输入表查询语句或多表联合查询语句设置该参数后除queryserverAddress参数必须设置外其余参数将失去作用或可不设置。
* 必选: 否<br />
* 默认值:无<br />
### 3.3 类型转换
目前hbase20xsqlreader支持大部分Phoenix类型但也存在部分个别类型没有支持的情况请注意检查你的类型。
下面列出MysqlReader针对Mysql类型转换列表:
| DataX 内部类型| Phoenix 数据类型 |
| -------- | ----- |
| String |CHAR, VARCHAR|
| Bytes |BINARY, VARBINARY|
| Bool |BOOLEAN |
| Long |INTEGER, TINYINT, SMALLINT, BIGINT |
| Double |FLOAT, DECIMAL, DOUBLE, |
| Date |DATE, TIME, TIMESTAMP |
## 4 性能报告
## 5 约束限制
* 切分表时切分列仅支持单个列,且该列必须是表主键
* 不设置splitPoint默认使用自动切分此时切分列仅支持整形和字符型
* 表名和SCHEMA名及列名大小写敏感请与Phoenix表实际大小写保持一致
* 仅支持通过Phoenix QeuryServer读取数据因此您的Phoenix必须启动QueryServer服务才能使用本插件
## 6 FAQ
***

116
hbase20xsqlreader/pom.xml Normal file
View File

@ -0,0 +1,116 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hbase20xsqlreader</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<phoenix.version>5.0.0-HBase-2.0</phoenix.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-queryserver</artifactId>
<version>${phoenix.version}</version>
<exclusions>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.0.44-beta</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-service-face</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>plugin-rdbms-util</artifactId>
<version>0.0.1-SNAPSHOT</version>
<scope>compile</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/hbase20xsqlreader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>hbase20xsqlreader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/hbase20xsqlreader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/hbase20xsqlreader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,28 @@
package com.alibaba.datax.plugin.reader.hbase20xsqlreader;
public class Constant {
public static final String PK_TYPE = "pkType";
public static final Object PK_TYPE_STRING = "pkTypeString";
public static final Object PK_TYPE_LONG = "pkTypeLong";
public static final String DEFAULT_SERIALIZATION = "PROTOBUF";
public static final String CONNECT_STRING_TEMPLATE = "jdbc:phoenix:thin:url=%s;serialization=%s";
public static final String CONNECT_DRIVER_STRING = "org.apache.phoenix.queryserver.client.Driver";
public static final String SELECT_COLUMNS_TEMPLATE = "SELECT COLUMN_NAME, COLUMN_FAMILY FROM SYSTEM.CATALOG WHERE TABLE_NAME='%s' AND COLUMN_NAME IS NOT NULL";
public static String QUERY_SQL_TEMPLATE_WITHOUT_WHERE = "select %s from %s ";
public static String QUERY_SQL_TEMPLATE = "select %s from %s where (%s)";
public static String QUERY_MIN_MAX_TEMPLATE = "SELECT MIN(%s),MAX(%s) FROM %s";
public static String QUERY_COLUMN_TYPE_TEMPLATE = "SELECT %s FROM %s LIMIT 1";
public static String QUERY_SQL_PER_SPLIT = "querySqlPerSplit";
}

View File

@ -0,0 +1,403 @@
package com.alibaba.datax.plugin.reader.hbase20xsqlreader;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.plugin.rdbms.util.DBUtilErrorCode;
import com.alibaba.datax.plugin.rdbms.util.RdbmsRangeSplitWrap;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigInteger;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
public class HBase20SQLReaderHelper {
private static final Logger LOG = LoggerFactory.getLogger(HBase20SQLReaderHelper.class);
private Configuration configuration;
private Connection connection;
private List<String> querySql;
private String fullTableName;
private List<String> columnNames;
private String splitKey;
private List<Object> splitPoints;
public HBase20SQLReaderHelper (Configuration configuration) {
this.configuration = configuration;
}
/**
* 校验配置参数是否正确
*/
public void validateParameter() {
// queryserver地址必须配置
String queryServerAddress = configuration.getNecessaryValue(Key.QUERYSERVER_ADDRESS,
HBase20xSQLReaderErrorCode.REQUIRED_VALUE);
String serialization = configuration.getString(Key.SERIALIZATION_NAME, Constant.DEFAULT_SERIALIZATION);
connection = getConnection(queryServerAddress, serialization);
//判断querySql是否配置如果配置则table配置可为空否则table必须配置
querySql = configuration.getList(Key.QUERY_SQL, String.class);
if (querySql == null || querySql.isEmpty()) {
LOG.info("Split according to splitKey or split points.");
String schema = configuration.getString(Key.SCHEMA, null);
String tableName = configuration.getNecessaryValue(Key.TABLE, HBase20xSQLReaderErrorCode.REQUIRED_VALUE);
if (schema != null && !schema.isEmpty()) {
fullTableName = schema + "." + tableName;
} else {
fullTableName = tableName;
}
// 如果列名未配置默认读取全部列*
columnNames = configuration.getList(Key.COLUMN, String.class);
splitKey = configuration.getString(Key.SPLIT_KEY, null);
splitPoints = configuration.getList(Key.SPLIT_POINT);
checkTable(schema, tableName);
dealWhere();
} else {
// 用户指定querySql切分不做处理根据给定sql读取数据即可
LOG.info("Split according to query sql.");
}
}
public Connection getConnection(String queryServerAddress, String serialization) {
String url = String.format(Constant.CONNECT_STRING_TEMPLATE, queryServerAddress, serialization);
LOG.debug("Connecting to QueryServer [" + url + "] ...");
Connection conn;
try {
Class.forName(Constant.CONNECT_DRIVER_STRING);
conn = DriverManager.getConnection(url);
conn.setAutoCommit(false);
} catch (Throwable e) {
throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.GET_QUERYSERVER_CONNECTION_ERROR,
"无法连接QueryServer配置不正确或服务未启动请检查配置和服务状态或者联系HBase管理员.", e);
}
LOG.debug("Connected to QueryServer successfully.");
return conn;
}
/**
* 检查表名列名和切分列是否存在
*/
public void checkTable(String schema, String tableName) {
Statement statement = null;
ResultSet resultSet = null;
try {
statement = connection.createStatement();
String selectSql = String.format(Constant.SELECT_COLUMNS_TEMPLATE, tableName);
// 处理schema不为空情况
if (schema == null || schema.isEmpty()) {
selectSql = selectSql + " AND TABLE_SCHEM IS NULL";
} else {
selectSql = selectSql + " AND TABLE_SCHEM = '" + schema + "'";
}
resultSet = statement.executeQuery(selectSql);
List<String> primaryColumnNames = new ArrayList<String>();
List<String> allColumnName = new ArrayList<String>();
while (resultSet.next()) {
String columnName = resultSet.getString(1);
allColumnName.add(columnName);
// 列族为空表示该列为主键列
if (resultSet.getString(2) == null) {
primaryColumnNames.add(columnName);
}
}
if (columnNames != null && !columnNames.isEmpty()) {
for (String columnName : columnNames) {
if (!allColumnName.contains(columnName)) {
// 用户配置的列名在元数据中不存在
throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_VALUE,
"您配置的列" + columnName + "在表" + tableName + "的元数据中不存在请检查您的配置或者联系HBase管理员.");
}
}
} else {
columnNames = allColumnName;
configuration.set(Key.COLUMN, allColumnName);
}
if (splitKey != null) {
// 切分列必须是主键列否则会严重影响读取性能
if (!primaryColumnNames.contains(splitKey)) {
throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_VALUE,
"您配置的切分列" + splitKey + "不是表" + tableName + "的主键请检查您的配置或者联系HBase管理员.");
}
}
} catch (SQLException e) {
throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.GET_PHOENIX_TABLE_ERROR,
"获取表" + tableName + "信息失败请检查您的集群和表状态或者联系HBase管理员.", e);
} finally {
closeJdbc(null, statement, resultSet);
}
}
public void closeJdbc(Connection connection, Statement statement, ResultSet resultSet) {
try {
if (resultSet != null) {
resultSet.close();
}
if (statement != null) {
statement.close();
}
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
LOG.warn("数据库连接关闭异常.", HBase20xSQLReaderErrorCode.CLOSE_PHOENIX_CONNECTION_ERROR, e);
}
}
public void dealWhere() {
String where = configuration.getString(Key.WHERE, null);
if(StringUtils.isNotBlank(where)) {
String whereImprove = where.trim();
if(whereImprove.endsWith(";") || whereImprove.endsWith("")) {
whereImprove = whereImprove.substring(0,whereImprove.length()-1);
}
configuration.set(Key.WHERE, whereImprove);
}
}
/**
* 对表进行切分
*/
public List<Configuration> doSplit(int adviceNumber) {
List<Configuration> pluginParams = new ArrayList<Configuration>();
List<String> rangeList;
String where = configuration.getString(Key.WHERE);
boolean hasWhere = StringUtils.isNotBlank(where);
if (querySql == null || querySql.isEmpty()) {
// 如果splitPoints为空则根据splitKey自动切分不过这种切分方式无法保证数据均分且只支持整形和字符型列
if (splitPoints == null || splitPoints.isEmpty()) {
LOG.info("Split accoring min and max value of splitColumn...");
Pair<Object, Object> minMaxPK = getPkRange(configuration);
if (null == minMaxPK) {
throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_SPLIT_PK,
"根据切分主键切分表失败. DataX仅支持切分主键为一个,并且类型为整数或者字符串类型. " +
"请尝试使用其他的切分主键或者联系 HBase管理员 进行处理.");
}
if (null == minMaxPK.getLeft() || null == minMaxPK.getRight()) {
// 切分后获取到的start/end Null 的情况
pluginParams.add(configuration);
return pluginParams;
}
boolean isStringType = Constant.PK_TYPE_STRING.equals(configuration
.getString(Constant.PK_TYPE));
boolean isLongType = Constant.PK_TYPE_LONG.equals(configuration
.getString(Constant.PK_TYPE));
if (isStringType) {
rangeList = RdbmsRangeSplitWrap.splitAndWrap(
String.valueOf(minMaxPK.getLeft()),
String.valueOf(minMaxPK.getRight()), adviceNumber,
splitKey, "'", null);
} else if (isLongType) {
rangeList = RdbmsRangeSplitWrap.splitAndWrap(
new BigInteger(minMaxPK.getLeft().toString()),
new BigInteger(minMaxPK.getRight().toString()),
adviceNumber, splitKey);
} else {
throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_SPLIT_PK,
"您配置的切分主键(splitPk) 类型 DataX 不支持. DataX 仅支持切分主键为一个,并且类型为整数或者字符串类型. " +
"请尝试使用其他的切分主键或者联系HBase管理员进行处理.");
}
} else {
LOG.info("Split accoring splitPoints...");
// 根据指定splitPoints进行切分
rangeList = buildSplitRange();
}
String tempQuerySql;
if (null != rangeList && !rangeList.isEmpty()) {
for (String range : rangeList) {
Configuration tempConfig = configuration.clone();
tempQuerySql = buildQuerySql(columnNames, fullTableName, where)
+ (hasWhere ? " and " : " where ") + range;
LOG.info("Query SQL: " + tempQuerySql);
tempConfig.set(Constant.QUERY_SQL_PER_SPLIT, tempQuerySql);
pluginParams.add(tempConfig);
}
} else {
Configuration tempConfig = configuration.clone();
tempQuerySql = buildQuerySql(columnNames, fullTableName, where)
+ (hasWhere ? " and " : " where ")
+ String.format(" %s IS NOT NULL", splitKey);
LOG.info("Query SQL: " + tempQuerySql);
tempConfig.set(Constant.QUERY_SQL_PER_SPLIT, tempQuerySql);
pluginParams.add(tempConfig);
}
} else {
// 指定querySql不需要切分
for (String sql : querySql) {
Configuration tempConfig = configuration.clone();
tempConfig.set(Constant.QUERY_SQL_PER_SPLIT, sql);
pluginParams.add(tempConfig);
}
}
return pluginParams;
}
public static String buildQuerySql(List<String> columnNames, String table,
String where) {
String querySql;
StringBuilder columnBuilder = new StringBuilder();
for (String columnName : columnNames) {
columnBuilder.append(columnName).append(",");
}
columnBuilder.setLength(columnBuilder.length() -1);
if (StringUtils.isBlank(where)) {
querySql = String.format(Constant.QUERY_SQL_TEMPLATE_WITHOUT_WHERE,
columnBuilder.toString(), table);
} else {
querySql = String.format(Constant.QUERY_SQL_TEMPLATE, columnBuilder.toString(),
table, where);
}
return querySql;
}
private List<String> buildSplitRange() {
String getSplitKeyTypeSQL = String.format(Constant.QUERY_COLUMN_TYPE_TEMPLATE, splitKey, fullTableName);
Statement statement = null;
ResultSet resultSet = null;
List<String> splitConditions = new ArrayList<String>();
try {
statement = connection.createStatement();
resultSet = statement.executeQuery(getSplitKeyTypeSQL);
ResultSetMetaData rsMetaData = resultSet.getMetaData();
int type = rsMetaData.getColumnType(1);
String symbol = "%s";
switch (type) {
case Types.CHAR:
case Types.VARCHAR:
symbol = "'%s'";
break;
case Types.DATE:
symbol = "TO_DATE('%s')";
break;
case Types.TIME:
symbol = "TO_TIME('%s')";
break;
case Types.TIMESTAMP:
symbol = "TO_TIMESTAMP('%s')";
break;
case Types.BINARY:
case Types.VARBINARY:
case Types.ARRAY:
throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_SPLIT_PK,
"切分列类型为" + rsMetaData.getColumnTypeName(1) + ",暂不支持该类型字段作为切分列。");
}
String splitCondition = null;
for (int i = 0; i <= splitPoints.size(); i++) {
if (i == 0) {
splitCondition = splitKey + " <= " + String.format(symbol, splitPoints.get(i));
} else if (i == splitPoints.size()) {
splitCondition = splitKey + " > " + String.format(symbol, splitPoints.get(i - 1));
} else {
splitCondition = splitKey + " > " + String.format(symbol, splitPoints.get(i - 1)) +
" AND " + splitKey + " <= " + String.format(symbol, splitPoints.get(i));
}
splitConditions.add(splitCondition);
}
return splitConditions;
} catch (SQLException e) {
throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.GET_TABLE_COLUMNTYPE_ERROR,
"获取切分列类型失败请检查服务或给定表和切分列是否正常或者联系HBase管理员进行处理。", e);
} finally {
closeJdbc(null, statement, resultSet);
}
}
private Pair<Object, Object> getPkRange(Configuration configuration) {
String pkRangeSQL = String.format(Constant.QUERY_MIN_MAX_TEMPLATE, splitKey, splitKey, fullTableName);
String where = configuration.getString(Key.WHERE);
if (StringUtils.isNotBlank(where)) {
pkRangeSQL = String.format("%s WHERE (%s AND %s IS NOT NULL)",
pkRangeSQL, where, splitKey);
}
Statement statement = null;
ResultSet resultSet = null;
Pair<Object, Object> minMaxPK = null;
try {
statement = connection.createStatement();
resultSet = statement.executeQuery(pkRangeSQL);
ResultSetMetaData rsMetaData = resultSet.getMetaData();
if (isPKTypeValid(rsMetaData)) {
if (isStringType(rsMetaData.getColumnType(1))) {
if(configuration != null) {
configuration
.set(Constant.PK_TYPE, Constant.PK_TYPE_STRING);
}
if (resultSet.next()) {
minMaxPK = new ImmutablePair<Object, Object>(
resultSet.getString(1), resultSet.getString(2));
}
} else if (isLongType(rsMetaData.getColumnType(1))) {
if(configuration != null) {
configuration.set(Constant.PK_TYPE, Constant.PK_TYPE_LONG);
}
if (resultSet.next()) {
minMaxPK = new ImmutablePair<Object, Object>(
resultSet.getLong(1), resultSet.getLong(2));
}
} else {
throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_SPLIT_PK,
"您配置的DataX切分主键(splitPk)有误. 因为您配置的切分主键(splitPk) 类型 DataX 不支持. " +
"DataX 仅支持切分主键为一个,并且类型为整数或者字符串类型. 请尝试使用其他的切分主键或者联系HBASE管理员进行处理.");
}
} else {
throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_SPLIT_PK,
"您配置的DataX切分主键(splitPk)有误. 因为您配置的切分主键(splitPk) 类型 DataX 不支持. " +
"DataX 仅支持切分主键为一个,并且类型为整数或者字符串类型. 请尝试使用其他的切分主键或者联系HBASE管理员进行处理.");
}
} catch (SQLException e) {
throw DataXException.asDataXException(HBase20xSQLReaderErrorCode.ILLEGAL_SPLIT_PK, e);
} finally {
closeJdbc(null, statement, resultSet);
}
return minMaxPK;
}
private static boolean isPKTypeValid(ResultSetMetaData rsMetaData) {
boolean ret = false;
try {
int minType = rsMetaData.getColumnType(1);
int maxType = rsMetaData.getColumnType(2);
boolean isNumberType = isLongType(minType);
boolean isStringType = isStringType(minType);
if (minType == maxType && (isNumberType || isStringType)) {
ret = true;
}
} catch (Exception e) {
throw DataXException.asDataXException(DBUtilErrorCode.ILLEGAL_SPLIT_PK,
"DataX获取切分主键(splitPk)字段类型失败. 该错误通常是系统底层异常导致. 请联系旺旺:askdatax或者DBA处理.");
}
return ret;
}
private static boolean isLongType(int type) {
boolean isValidLongType = type == Types.BIGINT || type == Types.INTEGER
|| type == Types.SMALLINT || type == Types.TINYINT;
return isValidLongType;
}
private static boolean isStringType(int type) {
return type == Types.CHAR || type == Types.NCHAR
|| type == Types.VARCHAR || type == Types.LONGVARCHAR
|| type == Types.NVARCHAR;
}
}

View File

@ -0,0 +1,53 @@
package com.alibaba.datax.plugin.reader.hbase20xsqlreader;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import java.util.List;
public class HBase20xSQLReader extends Reader {
public static class Job extends Reader.Job {
private Configuration originalConfig;
private HBase20SQLReaderHelper readerHelper;
@Override
public void init() {
this.originalConfig = this.getPluginJobConf();
this.readerHelper = new HBase20SQLReaderHelper(this.originalConfig);
readerHelper.validateParameter();
}
@Override
public List<Configuration> split(int adviceNumber) {
return readerHelper.doSplit(adviceNumber);
}
@Override
public void destroy() {
// do nothing
}
}
public static class Task extends Reader.Task {
private Configuration readerConfig;
private HBase20xSQLReaderTask hbase20xSQLReaderTask;
@Override
public void init() {
this.readerConfig = super.getPluginJobConf();
hbase20xSQLReaderTask = new HBase20xSQLReaderTask(readerConfig, super.getTaskGroupId(), super.getTaskId());
}
@Override
public void startRead(RecordSender recordSender) {
hbase20xSQLReaderTask.readRecord(recordSender);
}
@Override
public void destroy() {
// do nothing
}
}
}

View File

@ -0,0 +1,39 @@
package com.alibaba.datax.plugin.reader.hbase20xsqlreader;
import com.alibaba.datax.common.spi.ErrorCode;
public enum HBase20xSQLReaderErrorCode implements ErrorCode {
REQUIRED_VALUE("Hbasewriter-00", "您缺失了必须填写的参数值."),
ILLEGAL_VALUE("Hbasewriter-01", "您填写的参数值不合法."),
GET_QUERYSERVER_CONNECTION_ERROR("Hbasewriter-02", "获取QueryServer连接时出错."),
GET_PHOENIX_TABLE_ERROR("Hbasewriter-03", "获取 Phoenix table时出错."),
GET_TABLE_COLUMNTYPE_ERROR("Hbasewriter-05", "获取表列类型时出错."),
CLOSE_PHOENIX_CONNECTION_ERROR("Hbasewriter-06", "关闭JDBC连接时时出错."),
ILLEGAL_SPLIT_PK("Hbasewriter-07", "非法splitKey配置."),
PHOENIX_COLUMN_TYPE_CONVERT_ERROR("Hbasewriter-08", "phoenix的列类型转换错误."),
QUERY_DATA_ERROR("Hbasewriter-09", "truncate hbase表时发生异常."),
;
private final String code;
private final String description;
private HBase20xSQLReaderErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code, this.description);
}
}

View File

@ -0,0 +1,121 @@
package com.alibaba.datax.plugin.reader.hbase20xsqlreader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.statistics.PerfRecord;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.*;
public class HBase20xSQLReaderTask {
private static final Logger LOG = LoggerFactory.getLogger(HBase20xSQLReaderTask.class);
private Configuration readerConfig;
private int taskGroupId = -1;
private int taskId=-1;
public HBase20xSQLReaderTask(Configuration config, int taskGroupId, int taskId) {
this.readerConfig = config;
this.taskGroupId = taskGroupId;
this.taskId = taskId;
}
public void readRecord(RecordSender recordSender) {
String querySql = readerConfig.getString(Constant.QUERY_SQL_PER_SPLIT);
LOG.info("Begin to read record by Sql: [{}\n] {}.", querySql);
HBase20SQLReaderHelper helper = new HBase20SQLReaderHelper(readerConfig);
Connection conn = helper.getConnection(readerConfig.getString(Key.QUERYSERVER_ADDRESS),
readerConfig.getString(Key.SERIALIZATION_NAME, Constant.DEFAULT_SERIALIZATION));
Statement statement = null;
ResultSet resultSet = null;
try {
long rsNextUsedTime = 0;
long lastTime = System.nanoTime();
statement = conn.createStatement();
// 统计查询时间
PerfRecord queryPerfRecord = new PerfRecord(taskGroupId,taskId, PerfRecord.PHASE.SQL_QUERY);
queryPerfRecord.start();
resultSet = statement.executeQuery(querySql);
ResultSetMetaData meta = resultSet.getMetaData();
int columnNum = meta.getColumnCount();
// 统计的result_Next时间
PerfRecord allResultPerfRecord = new PerfRecord(taskGroupId, taskId, PerfRecord.PHASE.RESULT_NEXT_ALL);
allResultPerfRecord.start();
while (resultSet.next()) {
Record record = recordSender.createRecord();
rsNextUsedTime += (System.nanoTime() - lastTime);
for (int i = 1; i <= columnNum; i++) {
Column column = this.convertPhoenixValueToDataxColumn(meta.getColumnType(i), resultSet.getObject(i));
record.addColumn(column);
}
lastTime = System.nanoTime();
recordSender.sendToWriter(record);
}
allResultPerfRecord.end(rsNextUsedTime);
LOG.info("Finished read record by Sql: [{}\n] {}.", querySql);
} catch (SQLException e) {
throw DataXException.asDataXException(
HBase20xSQLReaderErrorCode.QUERY_DATA_ERROR, "查询Phoenix数据出现异常请检查服务状态或与HBase管理员联系", e);
} finally {
helper.closeJdbc(conn, statement, resultSet);
}
}
private Column convertPhoenixValueToDataxColumn(int sqlType, Object value) {
Column column;
switch (sqlType) {
case Types.CHAR:
case Types.VARCHAR:
column = new StringColumn((String) value);
break;
case Types.BINARY:
case Types.VARBINARY:
column = new BytesColumn((byte[]) value);
break;
case Types.BOOLEAN:
column = new BoolColumn((Boolean) value);
break;
case Types.INTEGER:
column = new LongColumn((Integer) value);
break;
case Types.TINYINT:
column = new LongColumn(((Byte) value).longValue());
break;
case Types.SMALLINT:
column = new LongColumn(((Short) value).longValue());
break;
case Types.BIGINT:
column = new LongColumn((Long) value);
break;
case Types.FLOAT:
column = new DoubleColumn((Float.valueOf(value.toString())));
break;
case Types.DECIMAL:
column = new DoubleColumn((BigDecimal)value);
break;
case Types.DOUBLE:
column = new DoubleColumn((Double) value);
break;
case Types.DATE:
column = new DateColumn((Date) value);
break;
case Types.TIME:
column = new DateColumn((Time) value);
break;
case Types.TIMESTAMP:
column = new DateColumn((Timestamp) value);
break;
default:
throw DataXException.asDataXException(
HBase20xSQLReaderErrorCode.PHOENIX_COLUMN_TYPE_CONVERT_ERROR, "遇到不可识别的phoenix类型" + "sqlType :" + sqlType);
}
return column;
}
}

View File

@ -0,0 +1,40 @@
package com.alibaba.datax.plugin.reader.hbase20xsqlreader;
public class Key {
/**
* 必选writer要读取的表的表名
*/
public final static String TABLE = "table";
/**
* 必选writer要读取哪些列
*/
public final static String COLUMN = "column";
/**
* 必选Phoenix QueryServer服务地址
*/
public final static String QUERYSERVER_ADDRESS = "queryServerAddress";
/**
* 可选序列化格式默认为PROTOBUF
*/
public static final String SERIALIZATION_NAME = "serialization";
/**
* 可选Phoenix表所属schema默认为空
*/
public static final String SCHEMA = "schema";
/**
* 可选读取数据时切分列
*/
public static final String SPLIT_KEY = "splitKey";
/**
* 可选读取数据时切分点
*/
public static final String SPLIT_POINT = "splitPoint";
/**
* 可选读取数据过滤条件配置
*/
public static final String WHERE = "where";
/**
* 可选查询语句配置
*/
public static final String QUERY_SQL = "querySql";
}

View File

@ -0,0 +1,7 @@
{
"name": "hbase20xsqlreader",
"class": "com.alibaba.datax.plugin.reader.hbase20xsqlreader.HBase20xSQLReader",
"description": "useScene: prod. mechanism: read data from phoenix through queryserver.",
"developer": "bake"
}

View File

@ -0,0 +1,13 @@
{
"name": "hbase20xsqlreader",
"parameter": {
"queryserverAddress": "",
"serialization": "PROTOBUF",
"schema": "",
"table": "TABLE1",
"column": ["ID", "NAME"],
"splitKey": "rowkey",
"splitPoint":[],
"where": ""
}
}

View File

@ -0,0 +1,164 @@
# HBase20xsqlwriter插件文档
## 1. 快速介绍
HBase20xsqlwriter实现了向hbase中的SQL表(phoenix)批量导入数据的功能。Phoenix因为对rowkey做了数据编码所以直接使用HBaseAPI进行写入会面临手工数据转换的问题麻烦且易错。本插件提供了SQL方式直接向Phoenix表写入数据。
在底层实现上通过Phoenix QueryServer的轻客户端驱动执行UPSERT语句向Phoenix写入数据。
### 1.1 支持的功能
* 支持带索引的表的数据导入,可以同步更新所有的索引表
### 1.2 限制
* 要求版本为Phoenix5.x及HBase2.x
* 仅支持通过Phoenix QeuryServer导入数据因此您Phoenix必须启动QueryServer服务才能使用本插件
* 不支持清空已有表数据
* 仅支持通过phoenix创建的表不支持原生HBase表
* 不支持带时间戳的数据导入
## 2. 实现原理
通过Phoenix轻客户端连接Phoenix QueryServer服务执行UPSERT语句向表中批量写入数据。因为使用上层接口所以可以同步更新索引表。
## 3. 配置说明
### 3.1 配置样例
```json
{
"job": {
"entry": {
"jvm": "-Xms2048m -Xmx2048m"
},
"content": [
{
"reader": {
"name": "txtfilereader",
"parameter": {
"path": "/Users/shf/workplace/datax_test/hbase20xsqlwriter/txt/normal.txt",
"charset": "UTF-8",
"column": [
{
"index": 0,
"type": "String"
},
{
"index": 1,
"type": "string"
},
{
"index": 2,
"type": "string"
},
{
"index": 3,
"type": "string"
}
],
"fieldDelimiter": ","
}
},
"writer": {
"name": "hbase20xsqlwriter",
"parameter": {
"batchSize": "100",
"column": [
"UID",
"TS",
"EVENTID",
"CONTENT"
],
"queryServerAddress": "http://127.0.0.1:8765",
"nullMode": "skip",
"table": "目标hbase表名大小写有关"
}
}
}
],
"setting": {
"speed": {
"channel": 5
}
}
}
}
```
### 3.2 参数说明
* **name**
* 描述:插件名字,必须是`hbase11xsqlwriter`
* 必选:是
* 默认值:无
* **table**
* 描述要导入的表名大小写敏感通常phoenix表都是**大写**表名
* 必选:是
* 默认值:无
* **column**
* 描述列名大小写敏感通常phoenix的列名都是**大写**。
* 需要注意列的顺序必须与reader输出的列的顺序一一对应。
* 不需要填写数据类型会自动从phoenix获取列的元数据
* 必选:是
* 默认值:无
* **queryServerAddress**
* 描述Phoenix QueryServer地址为必填项格式http://${hostName}:${ip}如http://172.16.34.58:8765
* 必选:是
* 默认值:无
* **serialization**
* 描述QueryServer使用的序列化协议
* 必选:否
* 默认值PROTOBUF
* **batchSize**
* 描述:批量写入的最大行数
* 必选:否
* 默认值256
* **nullMode**
* 描述读取到的列值为null时如何处理。目前有两种方式
* skip跳过这一列即不插入这一列(如果该行的这一列之前已经存在,则会被删除)
* empty插入空值值类型的空值是0varchar的空值是空字符串
* 必选:否
* 默认值skip
## 4. 性能报告
## 5. 约束限制
writer中的列的定义顺序必须与reader的列顺序匹配。reader中的列顺序定义了输出的每一行中列的组织顺序。而writer的列顺序定义的是在收到的数据中writer期待的列的顺序。例如
reader的列顺序是 c1, c2, c3, c4
writer的列顺序是 x1, x2, x3, x4
则reader输出的列c1就会赋值给writer的列x1。如果writer的列顺序是x1, x2, x4, x3则c3会赋值给x4c4会赋值给x3.
## 6. FAQ
1. 并发开多少合适?速度慢时增加并发有用吗?
数据导入进程默认JVM的堆大小是2GB并发(channel数)是通过多线程实现的开过多的线程有时并不能提高导入速度反而可能因为过于频繁的GC导致性能下降。一般建议并发数(channel)为5-10.
2. batchSize设置多少比较合适
默认是256但应根据每行的大小来计算最合适的batchSize。通常一次操作的数据量在2MB-4MB左右用这个值除以行大小即可得到batchSize。

107
hbase20xsqlwriter/pom.xml Normal file
View File

@ -0,0 +1,107 @@
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>datax-all</artifactId>
<groupId>com.alibaba.datax</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>hbase20xsqlwriter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<phoenix.version>5.0.0-HBase-2.0</phoenix.version>
<avatica.version>1.12.0</avatica.version>
<commons-codec.version>1.8</commons-codec.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-queryserver</artifactId>
<version>${phoenix.version}</version>
</dependency>
<!-- for test -->
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-service-face</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
<version>1.9.5</version>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/writer/hbase20xsqlwriter</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>hbase20xsqlwriter-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/writer/hbase20xsqlwriter</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/writer/hbase20xsqlwriter/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,17 @@
package com.alibaba.datax.plugin.writer.hbase20xsqlwriter;
public final class Constant {
public static final String DEFAULT_NULL_MODE = "skip";
public static final String DEFAULT_SERIALIZATION = "PROTOBUF";
public static final int DEFAULT_BATCH_ROW_COUNT = 256; // 默认一次写256行
public static final int TYPE_UNSIGNED_TINYINT = 11;
public static final int TYPE_UNSIGNED_SMALLINT = 13;
public static final int TYPE_UNSIGNED_INTEGER = 9;
public static final int TYPE_UNSIGNED_LONG = 10;
public static final int TYPE_UNSIGNED_FLOAT = 14;
public static final int TYPE_UNSIGNED_DOUBLE = 15;
public static final int TYPE_UNSIGNED_DATE = 19;
public static final int TYPE_UNSIGNED_TIME = 18;
public static final int TYPE_UNSIGNED_TIMESTAMP = 20;
}

View File

@ -0,0 +1,142 @@
package com.alibaba.datax.plugin.writer.hbase20xsqlwriter;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.*;
import java.util.ArrayList;
import java.util.List;
public class HBase20xSQLHelper {
private static final Logger LOG = LoggerFactory.getLogger(HBase20xSQLHelper.class);
/**
* phoenix瘦客户端连接前缀
*/
public static final String CONNECT_STRING_PREFIX = "jdbc:phoenix:thin:";
/**
* phoenix驱动名
*/
public static final String CONNECT_DRIVER_STRING = "org.apache.phoenix.queryserver.client.Driver";
/**
* 从系统表查找配置表信息
*/
public static final String SELECT_CATALOG_TABLE_STRING = "SELECT COLUMN_NAME FROM SYSTEM.CATALOG WHERE TABLE_NAME='%s' AND COLUMN_NAME IS NOT NULL";
/**
* 验证配置参数是否正确
*/
public static void validateParameter(com.alibaba.datax.common.util.Configuration originalConfig) {
// 表名和queryserver地址必须配置否则抛异常
String tableName = originalConfig.getNecessaryValue(Key.TABLE, HBase20xSQLWriterErrorCode.REQUIRED_VALUE);
String queryServerAddress = originalConfig.getNecessaryValue(Key.QUERYSERVER_ADDRESS, HBase20xSQLWriterErrorCode.REQUIRED_VALUE);
// 序列化格式可不配置默认PROTOBUF
String serialization = originalConfig.getString(Key.SERIALIZATION_NAME, Constant.DEFAULT_SERIALIZATION);
String connStr = getConnectionUrl(queryServerAddress, serialization);
// 校验jdbc连接是否正常
Connection conn = getThinClientConnection(connStr);
List<String> columnNames = originalConfig.getList(Key.COLUMN, String.class);
if (columnNames == null || columnNames.isEmpty()) {
throw DataXException.asDataXException(
HBase20xSQLWriterErrorCode.ILLEGAL_VALUE, "HBase的columns配置不能为空,请添加目标表的列名配置.");
}
String schema = originalConfig.getString(Key.SCHEMA);
// 检查表以及配置列是否存在
checkTable(conn, schema, tableName, columnNames);
}
/**
* 获取JDBC连接轻量级连接使用完后必须显式close
*/
public static Connection getThinClientConnection(String connStr) {
LOG.debug("Connecting to QueryServer [" + connStr + "] ...");
Connection conn;
try {
Class.forName(CONNECT_DRIVER_STRING);
conn = DriverManager.getConnection(connStr);
conn.setAutoCommit(false);
} catch (Throwable e) {
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.GET_QUERYSERVER_CONNECTION_ERROR,
"无法连接QueryServer配置不正确或服务未启动请检查配置和服务状态或者联系HBase管理员.", e);
}
LOG.debug("Connected to QueryServer successfully.");
return conn;
}
public static Connection getJdbcConnection(Configuration conf) {
String queryServerAddress = conf.getNecessaryValue(Key.QUERYSERVER_ADDRESS, HBase20xSQLWriterErrorCode.REQUIRED_VALUE);
// 序列化格式可不配置默认PROTOBUF
String serialization = conf.getString(Key.SERIALIZATION_NAME, "PROTOBUF");
String connStr = getConnectionUrl(queryServerAddress, serialization);
return getThinClientConnection(connStr);
}
public static String getConnectionUrl(String queryServerAddress, String serialization) {
String urlFmt = CONNECT_STRING_PREFIX + "url=%s;serialization=%s";
return String.format(urlFmt, queryServerAddress, serialization);
}
public static void checkTable(Connection conn, String schema, String tableName, List<String> columnNames) throws DataXException {
String selectSystemTable = getSelectSystemSQL(schema, tableName);
Statement st = null;
ResultSet rs = null;
try {
st = conn.createStatement();
rs = st.executeQuery(selectSystemTable);
List<String> allColumns = new ArrayList<String>();
if (rs.next()) {
allColumns.add(rs.getString(1));
} else {
LOG.error(tableName + "表不存在,请检查表名是否正确或是否已创建.", HBase20xSQLWriterErrorCode.GET_HBASE_TABLE_ERROR);
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.GET_HBASE_TABLE_ERROR,
tableName + "表不存在,请检查表名是否正确或是否已创建.");
}
while (rs.next()) {
allColumns.add(rs.getString(1));
}
for (String columnName : columnNames) {
if (!allColumns.contains(columnName)) {
// 用户配置的列名在元数据中不存在
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.ILLEGAL_VALUE,
"您配置的列" + columnName + "在目的表" + tableName + "的元数据中不存在请检查您的配置或者联系HBase管理员.");
}
}
} catch (SQLException t) {
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.GET_HBASE_TABLE_ERROR,
"获取表" + tableName + "信息失败请检查您的集群和表状态或者联系HBase管理员.", t);
} finally {
closeJdbc(conn, st, rs);
}
}
private static String getSelectSystemSQL(String schema, String tableName) {
String sql = String.format(SELECT_CATALOG_TABLE_STRING, tableName);
if (schema != null) {
sql = sql + " AND TABLE_SCHEM = '" + schema + "'";
}
return sql;
}
public static void closeJdbc(Connection connection, Statement statement, ResultSet resultSet) {
try {
if (resultSet != null) {
resultSet.close();
}
if (statement != null) {
statement.close();
}
if (connection != null) {
connection.close();
}
} catch (SQLException e) {
LOG.warn("数据库连接关闭异常.", HBase20xSQLWriterErrorCode.CLOSE_HBASE_CONNECTION_ERROR);
}
}
}

View File

@ -0,0 +1,58 @@
package com.alibaba.datax.plugin.writer.hbase20xsqlwriter;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.spi.Writer;
import com.alibaba.datax.common.util.Configuration;
import java.util.ArrayList;
import java.util.List;
public class HBase20xSQLWriter extends Writer {
public static class Job extends Writer.Job {
private Configuration config = null;
@Override
public void init() {
this.config = this.getPluginJobConf();
HBase20xSQLHelper.validateParameter(this.config);
}
@Override
public List<Configuration> split(int mandatoryNumber) {
List<Configuration> splitResultConfigs = new ArrayList<Configuration>();
for (int j = 0; j < mandatoryNumber; j++) {
splitResultConfigs.add(config.clone());
}
return splitResultConfigs;
}
@Override
public void destroy() {
//doNothing
}
}
public static class Task extends Writer.Task {
private Configuration taskConfig;
private HBase20xSQLWriterTask writerTask;
@Override
public void init() {
this.taskConfig = super.getPluginJobConf();
this.writerTask = new HBase20xSQLWriterTask(this.taskConfig);
}
@Override
public void startWrite(RecordReceiver lineReceiver) {
this.writerTask.startWriter(lineReceiver, super.getTaskPluginCollector());
}
@Override
public void destroy() {
// 不需要close
}
}
}

View File

@ -0,0 +1,37 @@
package com.alibaba.datax.plugin.writer.hbase20xsqlwriter;
import com.alibaba.datax.common.spi.ErrorCode;
public enum HBase20xSQLWriterErrorCode implements ErrorCode {
REQUIRED_VALUE("Hbasewriter-00", "您缺失了必须填写的参数值."),
ILLEGAL_VALUE("Hbasewriter-01", "您填写的参数值不合法."),
GET_QUERYSERVER_CONNECTION_ERROR("Hbasewriter-02", "获取QueryServer连接时出错."),
GET_HBASE_TABLE_ERROR("Hbasewriter-03", "获取 Hbase table时出错."),
CLOSE_HBASE_CONNECTION_ERROR("Hbasewriter-04", "关闭Hbase连接时出错."),
GET_TABLE_COLUMNTYPE_ERROR("Hbasewriter-05", "获取表列类型时出错."),
PUT_HBASE_ERROR("Hbasewriter-07", "写入hbase时发生IO异常."),
;
private final String code;
private final String description;
private HBase20xSQLWriterErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code, this.description);
}
}

View File

@ -0,0 +1,389 @@
package com.alibaba.datax.plugin.writer.hbase20xsqlwriter;
import com.alibaba.datax.common.element.Column;
import com.alibaba.datax.common.element.Record;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.plugin.RecordReceiver;
import com.alibaba.datax.common.plugin.TaskPluginCollector;
import com.alibaba.datax.common.util.Configuration;
import com.google.common.collect.Lists;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.math.BigDecimal;
import java.sql.*;
import java.util.Arrays;
import java.util.List;
public class HBase20xSQLWriterTask {
private final static Logger LOG = LoggerFactory.getLogger(HBase20xSQLWriterTask.class);
private Configuration configuration;
private TaskPluginCollector taskPluginCollector;
private Connection connection = null;
private PreparedStatement pstmt = null;
// 需要向hbsae写入的列的数量,即用户配置的column参数中列的个数时间戳不包含在内
private int numberOfColumnsToWrite;
// 期待从源头表的Record中拿到多少列
private int numberOfColumnsToRead;
private int[] columnTypes;
private List<String> columns;
private String fullTableName;
private NullModeType nullModeType;
private int batchSize;
public HBase20xSQLWriterTask(Configuration configuration) {
// 这里仅解析配置不访问远端集群配置的合法性检查在writer的init过程中进行
this.configuration = configuration;
}
public void startWriter(RecordReceiver lineReceiver, TaskPluginCollector taskPluginCollector) {
this.taskPluginCollector = taskPluginCollector;
try {
// 准备阶段
initialize();
// 写入数据
writeData(lineReceiver);
} catch (Throwable e) {
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.PUT_HBASE_ERROR, e);
} finally {
// 关闭jdbc连接
HBase20xSQLHelper.closeJdbc(connection, pstmt, null);
}
}
/**
* 初始化JDBC操作对象及列类型
* @throws SQLException
*/
private void initialize() throws SQLException {
if (connection == null) {
connection = HBase20xSQLHelper.getJdbcConnection(configuration);
connection.setAutoCommit(false);
}
nullModeType = NullModeType.getByTypeName(configuration.getString(Key.NULLMODE, Constant.DEFAULT_NULL_MODE));
batchSize = configuration.getInt(Key.BATCHSIZE, Constant.DEFAULT_BATCH_ROW_COUNT);
String schema = configuration.getString(Key.SCHEMA);
String tableName = configuration.getNecessaryValue(Key.TABLE, HBase20xSQLWriterErrorCode.REQUIRED_VALUE);
fullTableName = tableName;
if (schema != null && !schema.isEmpty()) {
fullTableName = schema + "." + tableName;
}
columns = configuration.getList(Key.COLUMN, String.class);
if (pstmt == null) {
// 一个Task的生命周期中只使用一个PreparedStatement对象
pstmt = createPreparedStatement();
columnTypes = getColumnSqlType();
}
}
/**
* 生成sql模板并根据模板创建PreparedStatement
*/
private PreparedStatement createPreparedStatement() throws SQLException {
// 生成列名集合列之间用逗号分隔 col1,col2,col3,...
StringBuilder columnNamesBuilder = new StringBuilder();
for (String col : columns) {
// 列名使用双引号则不自动转换为全大写而是保留用户配置的大小写
columnNamesBuilder.append("\"");
columnNamesBuilder.append(col);
columnNamesBuilder.append("\"");
columnNamesBuilder.append(",");
}
// 移除末尾多余的逗号
columnNamesBuilder.setLength(columnNamesBuilder.length() - 1);
String columnNames = columnNamesBuilder.toString();
numberOfColumnsToWrite = columns.size();
numberOfColumnsToRead = numberOfColumnsToWrite; // 开始的时候要读的列数娱要写的列数相等
// 生成UPSERT模板
StringBuilder upsertBuilder =
new StringBuilder("upsert into " + fullTableName + " (" + columnNames + " ) values (");
for (int i = 0; i < numberOfColumnsToWrite; i++) {
upsertBuilder.append("?,");
}
upsertBuilder.setLength(upsertBuilder.length() - 1); // 移除末尾多余的逗号
upsertBuilder.append(")");
String sql = upsertBuilder.toString();
PreparedStatement ps = connection.prepareStatement(sql);
LOG.debug("SQL template generated: " + sql);
return ps;
}
/**
* 根据列名来从数据库元数据中获取这一列对应的SQL类型
*/
private int[] getColumnSqlType() throws SQLException {
int[] types = new int[numberOfColumnsToWrite];
StringBuilder columnNamesBuilder = new StringBuilder();
for (String columnName : columns) {
columnNamesBuilder.append(columnName).append(",");
}
columnNamesBuilder.setLength(columnNamesBuilder.length() - 1);
// 查询一条数据获取表meta信息
String selectSql = "SELECT " + columnNamesBuilder + " FROM " + fullTableName + " LIMIT 1";
Statement statement = null;
try {
statement = connection.createStatement();
ResultSetMetaData meta = statement.executeQuery(selectSql).getMetaData();
for (int i = 0; i < columns.size(); i++) {
String name = columns.get(i);
types[i] = meta.getColumnType(i + 1);
LOG.debug("Column name : " + name + ", sql type = " + types[i] + " " + meta.getColumnTypeName(i + 1));
}
} catch (SQLException e) {
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.GET_TABLE_COLUMNTYPE_ERROR,
"获取表" + fullTableName + "列类型失败请检查配置和服务状态或者联系HBase管理员.", e);
} finally {
HBase20xSQLHelper.closeJdbc(null, statement, null);
}
return types;
}
/**
* 从接收器中获取每条记录写入Phoenix
*/
private void writeData(RecordReceiver lineReceiver) throws SQLException {
List<Record> buffer = Lists.newArrayListWithExpectedSize(batchSize);
Record record = null;
while ((record = lineReceiver.getFromReader()) != null) {
// 校验列数量是否符合预期
if (record.getColumnNumber() != numberOfColumnsToRead) {
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.ILLEGAL_VALUE,
"数据源给出的列数量[" + record.getColumnNumber() + "]与您配置中的列数量[" + numberOfColumnsToRead +
"]不同, 请检查您的配置 或者 联系 Hbase 管理员.");
}
buffer.add(record);
if (buffer.size() > batchSize) {
doBatchUpsert(buffer);
buffer.clear();
}
}
// 处理剩余的record
if (!buffer.isEmpty()) {
doBatchUpsert(buffer);
buffer.clear();
}
}
/**
* 批量提交一组数据如果失败则尝试一行行提交如果仍然失败抛错给用户
*/
private void doBatchUpsert(List<Record> records) throws SQLException {
try {
// 将所有record提交到connection缓存
for (Record r : records) {
setupStatement(r);
pstmt.addBatch();
}
pstmt.executeBatch();
// 将缓存的数据提交到phoenix
connection.commit();
pstmt.clearParameters();
pstmt.clearBatch();
} catch (SQLException e) {
LOG.error("Failed batch committing " + records.size() + " records", e);
// 批量提交失败则一行行重试以确定哪一行出错
connection.rollback();
HBase20xSQLHelper.closeJdbc(null, pstmt, null);
connection.setAutoCommit(true);
pstmt = createPreparedStatement();
doSingleUpsert(records);
} catch (Exception e) {
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.PUT_HBASE_ERROR, e);
}
}
/**
* 单行提交将出错的行记录到脏数据中由脏数据收集模块判断任务是否继续
*/
private void doSingleUpsert(List<Record> records) throws SQLException {
int rowNumber = 0;
for (Record r : records) {
try {
rowNumber ++;
setupStatement(r);
pstmt.executeUpdate();
} catch (SQLException e) {
//出错了记录脏数据
LOG.error("Failed writing to phoenix, rowNumber: " + rowNumber);
this.taskPluginCollector.collectDirtyRecord(r, e);
}
}
}
private void setupStatement(Record record) throws SQLException {
for (int i = 0; i < numberOfColumnsToWrite; i++) {
Column col = record.getColumn(i);
int sqlType = columnTypes[i];
// PreparedStatement中的索引从1开始所以用i+1
setupColumn(i + 1, sqlType, col);
}
}
private void setupColumn(int pos, int sqlType, Column col) throws SQLException {
if (col.getRawData() != null) {
switch (sqlType) {
case Types.CHAR:
case Types.VARCHAR:
pstmt.setString(pos, col.asString());
break;
case Types.BINARY:
case Types.VARBINARY:
pstmt.setBytes(pos, col.asBytes());
break;
case Types.BOOLEAN:
pstmt.setBoolean(pos, col.asBoolean());
break;
case Types.TINYINT:
case Constant.TYPE_UNSIGNED_TINYINT:
pstmt.setByte(pos, col.asLong().byteValue());
break;
case Types.SMALLINT:
case Constant.TYPE_UNSIGNED_SMALLINT:
pstmt.setShort(pos, col.asLong().shortValue());
break;
case Types.INTEGER:
case Constant.TYPE_UNSIGNED_INTEGER:
pstmt.setInt(pos, col.asLong().intValue());
break;
case Types.BIGINT:
case Constant.TYPE_UNSIGNED_LONG:
pstmt.setLong(pos, col.asLong());
break;
case Types.FLOAT:
pstmt.setFloat(pos, col.asDouble().floatValue());
break;
case Types.DOUBLE:
pstmt.setDouble(pos, col.asDouble());
break;
case Types.DECIMAL:
pstmt.setBigDecimal(pos, col.asBigDecimal());
break;
case Types.DATE:
case Constant.TYPE_UNSIGNED_DATE:
pstmt.setDate(pos, new Date(col.asDate().getTime()));
break;
case Types.TIME:
case Constant.TYPE_UNSIGNED_TIME:
pstmt.setTime(pos, new Time(col.asDate().getTime()));
break;
case Types.TIMESTAMP:
case Constant.TYPE_UNSIGNED_TIMESTAMP:
pstmt.setTimestamp(pos, new Timestamp(col.asDate().getTime()));
break;
default:
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.ILLEGAL_VALUE,
"不支持您配置的列类型:" + sqlType + ", 请检查您的配置 或者 联系 Hbase 管理员.");
}
} else {
// 没有值按空值的配置情况处理
switch (nullModeType){
case Skip:
// 跳过空值则不插入该列,
pstmt.setNull(pos, sqlType);
break;
case Empty:
// 插入"空值"请注意不同类型的空值不同
// 另外对SQL来说空值本身是有值的这与直接操作HBASE Native API时的空值完全不同
pstmt.setObject(pos, getEmptyValue(sqlType));
break;
default:
// nullMode的合法性在初始化配置的时候已经校验过这里一定不会出错
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.ILLEGAL_VALUE,
"Hbasewriter 不支持该 nullMode 类型: " + nullModeType +
", 目前支持的 nullMode 类型是:" + Arrays.asList(NullModeType.values()));
}
}
}
/**
* 根据类型获取"空值"
* 值类型的空值都是0bool是falseString是空字符串
* @param sqlType sql数据类型定义于{@link Types}
*/
private Object getEmptyValue(int sqlType) {
switch (sqlType) {
case Types.VARCHAR:
return "";
case Types.BOOLEAN:
return false;
case Types.TINYINT:
case Constant.TYPE_UNSIGNED_TINYINT:
return (byte) 0;
case Types.SMALLINT:
case Constant.TYPE_UNSIGNED_SMALLINT:
return (short) 0;
case Types.INTEGER:
case Constant.TYPE_UNSIGNED_INTEGER:
return (int) 0;
case Types.BIGINT:
case Constant.TYPE_UNSIGNED_LONG:
return (long) 0;
case Types.FLOAT:
return (float) 0.0;
case Types.DOUBLE:
return (double) 0.0;
case Types.DECIMAL:
return new BigDecimal(0);
case Types.DATE:
case Constant.TYPE_UNSIGNED_DATE:
return new Date(0);
case Types.TIME:
case Constant.TYPE_UNSIGNED_TIME:
return new Time(0);
case Types.TIMESTAMP:
case Constant.TYPE_UNSIGNED_TIMESTAMP:
return new Timestamp(0);
case Types.BINARY:
case Types.VARBINARY:
return new byte[0];
default:
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.ILLEGAL_VALUE,
"不支持您配置的列类型:" + sqlType + ", 请检查您的配置 或者 联系 Hbase 管理员.");
}
}
}

View File

@ -0,0 +1,36 @@
package com.alibaba.datax.plugin.writer.hbase20xsqlwriter;
public class Key {
/**
* 必选writer要写入的表的表名
*/
public final static String TABLE = "table";
/**
* 必选writer要写入哪些列
*/
public final static String COLUMN = "column";
/**
* 必选Phoenix QueryServer服务地址
*/
public final static String QUERYSERVER_ADDRESS = "queryServerAddress";
/**
* 可选序列化格式默认为PROTOBUF
*/
public static final String SERIALIZATION_NAME = "serialization";
/**
* 可选批量写入的最大行数默认100行
*/
public static final String BATCHSIZE = "batchSize";
/**
* 可选遇到空值默认跳过
*/
public static final String NULLMODE = "nullMode";
/**
* 可选Phoenix表所属schema默认为空
*/
public static final String SCHEMA = "schema";
}

View File

@ -0,0 +1,32 @@
package com.alibaba.datax.plugin.writer.hbase20xsqlwriter;
import com.alibaba.datax.common.exception.DataXException;
import java.util.Arrays;
public enum NullModeType {
Skip("skip"),
Empty("empty")
;
private String mode;
NullModeType(String mode) {
this.mode = mode.toLowerCase();
}
public String getMode() {
return mode;
}
public static NullModeType getByTypeName(String modeName) {
for (NullModeType modeType : values()) {
if (modeType.mode.equalsIgnoreCase(modeName)) {
return modeType;
}
}
throw DataXException.asDataXException(HBase20xSQLWriterErrorCode.ILLEGAL_VALUE,
"Hbasewriter 不支持该 nullMode 类型:" + modeName + ", 目前支持的 nullMode 类型是:" + Arrays.asList(values()));
}
}

View File

@ -0,0 +1,7 @@
{
"name": "hbase20xsqlwriter",
"class": "com.alibaba.datax.plugin.writer.hbase20xsqlwriter.HBase20xSQLWriter",
"description": "useScene: prod. mechanism: use hbase sql UPSERT to put data, index tables will be updated too.",
"developer": "bake"
}

View File

@ -0,0 +1,13 @@
{
"name": "hbase20xsqlwriter",
"parameter": {
"queryServerAddress": "",
"table": "",
"serialization": "PROTOBUF",
"column": [
],
"batchSize": "100",
"nullMode": "skip",
"schema": ""
}
}

View File

@ -308,5 +308,19 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>hbase20xsqlreader/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>hbase20xsqlwriter/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
</fileSets>
</assembly>

View File

@ -89,6 +89,8 @@
<!-- common support module -->
<module>plugin-rdbms-util</module>
<module>plugin-unstructured-storage-util</module>
<module>hbase20xsqlreader</module>
<module>hbase20xsqlwriter</module>
</modules>
<dependencyManagement>