Merge pull request #213 from lw309637554/add-hbase11xsqlreader

add hbase11xsqlreader
This commit is contained in:
binaryWorld 2018-11-07 20:27:49 +08:00 committed by GitHub
commit d4d1ea6a15
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
21 changed files with 1326 additions and 0 deletions

View File

@ -47,6 +47,7 @@ DataX目前已经有了比较全面的插件体系主流的RDBMS数据库、N
| NoSQL数据存储 | OTS | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md)|
| | Hbase0.94 | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md)|
| | Hbase1.1 | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md)|
| | Phoenix4.x | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md)|
| | MongoDB | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/mongoreader/doc/mongoreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongowriter/doc/mongowriter.md)|
| | Hive | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md)|
| 无结构化数据存储 | TxtFile | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md)|

View File

@ -0,0 +1,252 @@
# hbase11xsqlreader 插件文档
___
## 1 快速介绍
hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实现上hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群并执行相应的sql语句将数据从Phoenix库中SELECT出来。
## 2 实现原理
简而言之hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群并根据用户配置的信息生成查询SELECT 语句然后发送到HBase集群并将返回结果使用DataX自定义的数据类型拼装为抽象的数据集并传递给下游Writer处理。
# hbase11xsqlreader 插件文档
___
## 1 快速介绍
hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实现上hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群并执行相应的sql语句将数据从Phoenix库中SELECT出来。
## 2 实现原理
简而言之hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群并根据用户配置的信息生成查询SELECT 语句然后发送到HBase集群并将返回结果使用DataX自定义的数据类型拼装为抽象的数据集并传递给下游Writer处理。
## 3 功能说明
### 3.1 配置样例
* 配置一个从Phoenix同步抽取数据到本地的作业:
```
{
"job": {
"setting": {
"speed": {
//设置传输速度单位为byte/sDataX运行会尽可能达到该速度但是不超过它.
"byte":10485760
},
//出错限制
"errorLimit": {
//出错的record条数上限当大于该值即报错。
"record": 0,
//出错的record百分比上限 1.0表示100%0.02表示2%
"percentage": 0.02
}
},
"content": [ {
"reader": {
//指定插件为hbase11xsqlreader
"name": "hbase11xsqlreader",
"parameter": {
//填写连接Phoenix的hbase集群zk地址
"hbaseConfig": {
"hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com"
},
//填写要读取的phoenix的表名
"table": "US_POPULATION",
//填写要读取的列名,不填读取所有列
"column": [
]
}
},
"writer": {
//writer类型
"name": "streamwriter",
//是否打印内容
"parameter": {
"print":true,
"encoding": "UTF-8"
}
}
}
]
}
}
```
### 3.2 参数说明
* **hbaseConfig**
* 描述hbase11xsqlreader需要通过Phoenix客户端去连接hbase集群因此这里需要填写对应hbase集群的zkurl地址注意不要添加2181。
* 必选:是 <br />
* 默认值:无 <br />
* **table**
* 描述编写Phoenix中的表名,如果有namespace该值设置为'namespace.tablename'
* 必选:是 <br />
* 默认值:无 <br />
* **column**
* 描述填写需要从phoenix表中读取的列名集合使用JSON的数组描述字段信息空值表示读取所有列。
* 必选:是 <br />
* 默认值:无 <br />
### 3.3 类型转换
目前hbase11xsqlreader支持大部分Phoenix类型但也存在部分个别类型没有支持的情况请注意检查你的类型。
下面列出MysqlReader针对Mysql类型转换列表:
| DataX 内部类型| Phoenix 数据类型 |
| -------- | ----- |
| String |CHAR, VARCHAR|
| Bytes |BINARY, VARBINARY|
| Bool |BOOLEAN |
| Long |INTEGER, TINYINT, SMALLINT, BIGINT |
| Double |FLOAT, DECIMAL, DOUBLE, |
| Date |DATE, TIME, TIMESTAMP |
## 4 性能报告
## 5 约束限制
## 6 FAQ
***
## 3 功能说明
### 3.1 配置样例
* 配置一个从Phoenix同步抽取数据到本地的作业:
```
{
"job": {
"setting": {
"speed": {
//设置传输速度单位为byte/sDataX运行会尽可能达到该速度但是不超过它.
"byte":10485760
},
//出错限制
"errorLimit": {
//出错的record条数上限当大于该值即报错。
"record": 0,
//出错的record百分比上限 1.0表示100%0.02表示2%
"percentage": 0.02
}
},
"content": [ {
"reader": {
//指定插件为hbase11xsqlreader
"name": "hbase11xsqlreader",
"parameter": {
//填写连接Phoenix的hbase集群zk地址
"hbaseConfig": {
"hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com"
},
//填写要读取的phoenix的表名
"table": "US_POPULATION",
//填写要读取的列名,不填读取所有列
"column": [
]
}
},
"writer": {
//writer类型
"name": "streamwriter",
//是否打印内容
"parameter": {
"print":true,
"encoding": "UTF-8"
}
}
}
]
}
}
```
### 3.2 参数说明
* **hbaseConfig**
* 描述hbase11xsqlreader需要通过Phoenix客户端去连接hbase集群因此这里需要填写对应hbase集群的zkurl地址注意不要添加2181。
* 必选:是 <br />
* 默认值:无 <br />
* **table**
* 描述编写Phoenix中的表名,如果有namespace该值设置为'namespace.tablename'
* 必选:是 <br />
* 默认值:无 <br />
* **column**
* 描述填写需要从phoenix表中读取的列名集合使用JSON的数组描述字段信息空值表示读取所有列。
* 必选:是 <br />
* 默认值:无 <br />
### 3.3 类型转换
目前hbase11xsqlreader支持大部分Phoenix类型但也存在部分个别类型没有支持的情况请注意检查你的类型。
下面列出MysqlReader针对Mysql类型转换列表:
| DataX 内部类型| Phoenix 数据类型 |
| -------- | ----- |
| String |CHAR, VARCHAR|
| Bytes |BINARY, VARBINARY|
| Bool |BOOLEAN |
| Long |INTEGER, TINYINT, SMALLINT, BIGINT |
| Double |FLOAT, DECIMAL, DOUBLE, |
| Date |DATE, TIME, TIMESTAMP |
## 4 性能报告
## 5 约束限制
## 6 FAQ
***

109
hbase11xsqlreader/pom.xml Normal file
View File

@ -0,0 +1,109 @@
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-all</artifactId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<artifactId>hbase11xsqlreader</artifactId>
<name>hbase11xsqlreader</name>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>
<properties>
<phoenix.version>4.12.0-AliHBase-1.1-0.5</phoenix.version>
</properties>
<dependencies>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-common</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<artifactId>slf4j-log4j12</artifactId>
<groupId>org.slf4j</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.aliyun.phoenix</groupId>
<artifactId>ali-phoenix-core</artifactId>
<version>${phoenix.version}</version>
<exclusions>
<exclusion>
<artifactId>servlet-api</artifactId>
<groupId>javax.servlet</groupId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
<version>2.0.44-beta</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-core</artifactId>
<version>${datax-project-version}</version>
<exclusions>
<exclusion>
<groupId>com.alibaba.datax</groupId>
<artifactId>datax-service-face</artifactId>
</exclusion>
</exclusions>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<resources>
<resource>
<directory>src/main/java</directory>
<includes>
<include>**/*.properties</include>
</includes>
</resource>
</resources>
<plugins>
<!-- compiler plugin -->
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
<encoding>${project-sourceEncoding}</encoding>
</configuration>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptors>
<descriptor>src/main/assembly/package.xml</descriptor>
</descriptors>
<finalName>datax</finalName>
</configuration>
<executions>
<execution>
<id>dwzip</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>

View File

@ -0,0 +1,35 @@
<assembly
xmlns="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/plugins/maven-assembly-plugin/assembly/1.1.0 http://maven.apache.org/xsd/assembly-1.1.0.xsd">
<id></id>
<formats>
<format>dir</format>
</formats>
<includeBaseDirectory>false</includeBaseDirectory>
<fileSets>
<fileSet>
<directory>src/main/resources</directory>
<includes>
<include>plugin.json</include>
<include>plugin_job_template.json</include>
</includes>
<outputDirectory>plugin/reader/hbase11xsqlreader</outputDirectory>
</fileSet>
<fileSet>
<directory>target/</directory>
<includes>
<include>hbase11xsqlreader-0.0.1-SNAPSHOT.jar</include>
</includes>
<outputDirectory>plugin/reader/hbase11xsqlreader</outputDirectory>
</fileSet>
</fileSets>
<dependencySets>
<dependencySet>
<useProjectArtifact>false</useProjectArtifact>
<outputDirectory>plugin/reader/hbase11xsqlreader/libs</outputDirectory>
<scope>runtime</scope>
</dependencySet>
</dependencySets>
</assembly>

View File

@ -0,0 +1,31 @@
package com.alibaba.datax.plugin.reader.hbase11xsqlreader;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
import java.io.IOException;
import org.apache.hadoop.io.Writable;
public class HadoopSerializationUtil {
public static byte[] serialize(Writable writable) throws IOException {
ByteArrayOutputStream out = new ByteArrayOutputStream();
DataOutputStream dataout = new DataOutputStream(out);
writable.write(dataout);
dataout.close();
return out.toByteArray();
}
public static void deserialize(Writable writable, byte[] bytes) throws Exception {
ByteArrayInputStream in = new ByteArrayInputStream(bytes);
DataInputStream datain = new DataInputStream(in);
writable.readFields(datain);
datain.close();
}
}

View File

@ -0,0 +1,125 @@
package com.alibaba.datax.plugin.reader.hbase11xsqlreader;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.TypeReference;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.JobID;
import org.apache.hadoop.mapreduce.task.JobContextImpl;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
import org.apache.phoenix.mapreduce.PhoenixInputFormat;
import org.apache.phoenix.mapreduce.PhoenixInputSplit;
import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.SaltingUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
public class HbaseSQLHelper {
private static final Logger LOG = LoggerFactory.getLogger(HbaseSQLHelper.class);
public static org.apache.hadoop.conf.Configuration generatePhoenixConf(HbaseSQLReaderConfig readerConfig) {
org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
String table = readerConfig.getTableName();
List<String> columns = readerConfig.getColumns();
String zkUrl = readerConfig.getZkUrl();
PhoenixConfigurationUtil.setInputClass(conf, PhoenixRecordWritable.class);
PhoenixConfigurationUtil.setInputTableName(conf, table);
if (!columns.isEmpty()) {
PhoenixConfigurationUtil.setSelectColumnNames(conf, columns.toArray(new String[columns.size()]));
}
PhoenixEmbeddedDriver.ConnectionInfo info = null;
try {
info = PhoenixEmbeddedDriver.ConnectionInfo.create(zkUrl);
} catch (SQLException e) {
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.GET_PHOENIX_CONNECTIONINFO_ERROR, "通过zkURL获取phoenix的connectioninfo出错请检查hbase集群服务是否正常", e);
}
conf.set(HConstants.ZOOKEEPER_QUORUM, info.getZookeeperQuorum());
if (info.getPort() != null)
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, info.getPort());
if (info.getRootNode() != null)
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, info.getRootNode());
return conf;
}
public static List<String> getPColumnNames(String connectionString, String tableName) throws SQLException {
Connection con =
DriverManager.getConnection(connectionString);
PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class);
MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection);
PTable table = metaDataClient.updateCache("", tableName).getTable();
List<String> columnNames = new ArrayList<String>();
for (PColumn pColumn : table.getColumns()) {
if (!pColumn.getName().getString().equals(SaltingUtil.SALTING_COLUMN_NAME))
columnNames.add(pColumn.getName().getString());
else
LOG.info(tableName + " is salt table");
}
return columnNames;
}
public static List<Configuration> split(HbaseSQLReaderConfig readerConfig) {
PhoenixInputFormat inputFormat = new PhoenixInputFormat<PhoenixRecordWritable>();
org.apache.hadoop.conf.Configuration conf = generatePhoenixConf(readerConfig);
JobID jobId = new JobID(Key.MOCK_JOBID_IDENTIFIER, Key.MOCK_JOBID);
JobContextImpl jobContext = new JobContextImpl(conf, jobId);
List<Configuration> resultConfigurations = new ArrayList<Configuration>();
List<InputSplit> rawSplits = null;
try {
rawSplits = inputFormat.getSplits(jobContext);
LOG.info("split size is " + rawSplits.size());
for (InputSplit split : rawSplits) {
Configuration cfg = readerConfig.getOriginalConfig().clone();
byte[] splitSer = HadoopSerializationUtil.serialize((PhoenixInputSplit) split);
String splitBase64Str = org.apache.commons.codec.binary.Base64.encodeBase64String(splitSer);
cfg.set(Key.SPLIT_KEY, splitBase64Str);
resultConfigurations.add(cfg);
}
} catch (IOException e) {
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.GET_PHOENIX_SPLITS_ERROR, "获取表的split信息时出现了异常请检查hbase集群服务是否正常," + e.getMessage(), e);
} catch (InterruptedException e) {
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.GET_PHOENIX_SPLITS_ERROR, "获取表的split信息时被中断请重试若还有问题请联系datax管理员," + e.getMessage(), e);
}
return resultConfigurations;
}
public static HbaseSQLReaderConfig parseConfig(Configuration cfg) {
return HbaseSQLReaderConfig.parse(cfg);
}
public static Pair<String, String> getHbaseConfig(String hbaseCfgString) {
assert hbaseCfgString != null;
Map<String, String> hbaseConfigMap = JSON.parseObject(hbaseCfgString, new TypeReference<Map<String, String>>() {
});
String zkQuorum = hbaseConfigMap.get(Key.HBASE_ZK_QUORUM);
String znode = hbaseConfigMap.get(Key.HBASE_ZNODE_PARENT);
if(znode == null)
znode = "";
return new Pair<String, String>(zkQuorum, znode);
}
}

View File

@ -0,0 +1,89 @@
package com.alibaba.datax.plugin.reader.hbase11xsqlreader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.spi.Reader;
import com.alibaba.datax.common.util.Configuration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.List;
public class HbaseSQLReader extends Reader {
public static class Job extends Reader.Job {
private HbaseSQLReaderConfig readerConfig;
@Override
public void init() {
readerConfig = HbaseSQLHelper.parseConfig(this.getPluginJobConf());
}
@Override
public List<Configuration> split(int adviceNumber) {
return HbaseSQLHelper.split(readerConfig);
}
@Override
public void destroy() {
}
}
public static class Task extends Reader.Task {
private static Logger LOG = LoggerFactory.getLogger(Task.class);
private HbaseSQLReaderTask hbase11SQLReaderTask;
@Override
public void init() {
hbase11SQLReaderTask = new HbaseSQLReaderTask(this.getPluginJobConf());
this.hbase11SQLReaderTask.init();
}
@Override
public void prepare() {
hbase11SQLReaderTask.prepare();
}
@Override
public void startRead(RecordSender recordSender) {
Long recordNum = 0L;
Record record = recordSender.createRecord();
boolean fetchOK;
while (true) {
try {
fetchOK = this.hbase11SQLReaderTask.readRecord(record);
} catch (Exception e) {
LOG.info("Read record exception", e);
e.printStackTrace();
super.getTaskPluginCollector().collectDirtyRecord(record, e);
record = recordSender.createRecord();
continue;
}
if (fetchOK) {
recordSender.sendToWriter(record);
recordNum++;
if (recordNum % 10000 == 0)
LOG.info("already read record num is " + recordNum);
record = recordSender.createRecord();
} else {
break;
}
}
recordSender.flush();
}
@Override
public void post() {
super.post();
}
@Override
public void destroy() {
this.hbase11SQLReaderTask.destroy();
}
}
}

View File

@ -0,0 +1,164 @@
package com.alibaba.datax.plugin.reader.hbase11xsqlreader;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.util.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.sql.SQLException;
import java.util.List;
public class HbaseSQLReaderConfig {
private final static Logger LOG = LoggerFactory.getLogger(HbaseSQLReaderConfig.class);
private Configuration originalConfig; // 原始的配置数据
// 集群配置
private String connectionString;
public String getZkUrl() {
return zkUrl;
}
private String zkUrl;
// 表配置
private String tableName;
private List<String> columns; // 目的表的所有列的列名包括主键和非主键不包括时间列
/**
* @return 获取原始的datax配置
*/
public Configuration getOriginalConfig() {
return originalConfig;
}
/**
* @return 获取连接字符串使用ZK模式
*/
public String getConnectionString() {
return connectionString;
}
/**
* @return 获取表名
*/
public String getTableName() {
return tableName;
}
/**
* @return 返回所有的列包括主键列和非主键列但不包括version列
*/
public List<String> getColumns() {
return columns;
}
/**
* @param dataxCfg
* @return
*/
public static HbaseSQLReaderConfig parse(Configuration dataxCfg) {
assert dataxCfg != null;
HbaseSQLReaderConfig cfg = new HbaseSQLReaderConfig();
cfg.originalConfig = dataxCfg;
// 1. 解析集群配置
parseClusterConfig(cfg, dataxCfg);
// 2. 解析列配置
parseTableConfig(cfg, dataxCfg);
// 4. 打印解析出来的配置
LOG.info("HBase SQL reader config parsed:" + cfg.toString());
return cfg;
}
private static void parseClusterConfig(HbaseSQLReaderConfig cfg, Configuration dataxCfg) {
// 获取hbase集群的连接信息字符串
String hbaseCfg = dataxCfg.getString(Key.HBASE_CONFIG);
if (StringUtils.isBlank(hbaseCfg)) {
// 集群配置必须存在且不为空
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.REQUIRED_VALUE, "读 Hbase 时需要配置hbaseConfig其内容为 Hbase 连接信息,请查看 Hbase 集群信息.");
}
// 解析zk服务器和znode信息
Pair<String, String> zkCfg;
try {
zkCfg = HbaseSQLHelper.getHbaseConfig(hbaseCfg);
} catch (Throwable t) {
// 解析hbase配置错误
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.REQUIRED_VALUE, "解析hbaseConfig出错请确认您配置的hbaseConfig为合法的json数据格式内容正确." );
}
String zkQuorum = zkCfg.getFirst();
String znode = zkCfg.getSecond();
if (zkQuorum == null || zkQuorum.isEmpty()) {
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "HBase的hbase.zookeeper.quorum配置不能为空" );
}
// 生成sql使用的连接字符串 格式 jdbc:hbase:zk_quorum:2181:/znode_parent
cfg.connectionString = "jdbc:phoenix:" + zkQuorum;
cfg.zkUrl = zkQuorum + ":2181";
if (!znode.isEmpty()) {
cfg.connectionString += cfg.connectionString + ":" + znode;
cfg.zkUrl += cfg.zkUrl + ":" + znode;
}
}
private static void parseTableConfig(HbaseSQLReaderConfig cfg, Configuration dataxCfg) {
// 解析并检查表名
cfg.tableName = dataxCfg.getString(Key.TABLE);
if (cfg.tableName == null || cfg.tableName.isEmpty()) {
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "HBase的tableName配置不能为空,请检查并修改配置." );
}
// 解析列配置,列为空时补全所有的列
cfg.columns = dataxCfg.getList(Key.COLUMN, String.class);
if (cfg.columns == null) {
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "您配置的tableName含有非法字符{0},请检查您的配置.");
} else if (cfg.columns.isEmpty()) {
try {
cfg.columns = HbaseSQLHelper.getPColumnNames(cfg.connectionString, cfg.tableName);
dataxCfg.set(Key.COLUMN, cfg.columns);
} catch (SQLException e) {
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.GET_PHOENIX_COLUMN_ERROR, "HBase的columns配置不能为空,请添加目标表的列名配置." + e.getMessage(), e);
}
}
}
@Override
public String toString() {
StringBuilder ret = new StringBuilder();
// 集群配置
ret.append("\n[jdbc]");
ret.append(connectionString);
ret.append("\n");
// 表配置
ret.append("[tableName]");
ret.append(tableName);
ret.append("\n");
ret.append("[column]");
for (String col : columns) {
ret.append(col);
ret.append(",");
}
ret.setLength(ret.length() - 1);
ret.append("\n");
return ret.toString();
}
/**
* 禁止直接实例化本类必须调用{@link #parse}接口来初始化
*/
private HbaseSQLReaderConfig() {
}
}

View File

@ -0,0 +1,40 @@
package com.alibaba.datax.plugin.reader.hbase11xsqlreader;
import com.alibaba.datax.common.spi.ErrorCode;
public enum HbaseSQLReaderErrorCode implements ErrorCode {
REQUIRED_VALUE("Hbasewriter-00", "您缺失了必须填写的参数值."),
ILLEGAL_VALUE("Hbasewriter-01", "您填写的参数值不合法."),
GET_PHOENIX_COLUMN_ERROR("Hbasewriter-02", "获取phoenix表的列值错误"),
GET_PHOENIX_CONNECTIONINFO_ERROR("Hbasewriter-03", "获取phoenix服务的zkurl错误"),
GET_PHOENIX_SPLITS_ERROR("Hbasewriter-04", "获取phoenix的split信息错误"),
PHOENIX_CREATEREADER_ERROR("Hbasewriter-05", "获取phoenix的reader错误"),
PHOENIX_READERINIT_ERROR("Hbasewriter-06", "phoenix reader的初始化错误"),
PHOENIX_COLUMN_TYPE_CONVERT_ERROR("Hbasewriter-07", "phoenix的列类型转换错误"),
PHOENIX_RECORD_READ_ERROR("Hbasewriter-08", "phoenix record 读取错误"),
PHOENIX_READER_CLOSE_ERROR("Hbasewriter-09", "phoenix reader 的close错误")
;
private final String code;
private final String description;
private HbaseSQLReaderErrorCode(String code, String description) {
this.code = code;
this.description = description;
}
@Override
public String getCode() {
return this.code;
}
@Override
public String getDescription() {
return this.description;
}
@Override
public String toString() {
return String.format("Code:[%s], Description:[%s].", this.code, this.description);
}
}

View File

@ -0,0 +1,178 @@
package com.alibaba.datax.plugin.reader.hbase11xsqlreader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration;
import org.apache.hadoop.mapreduce.TaskAttemptID;
import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.mapreduce.PhoenixInputFormat;
import org.apache.phoenix.mapreduce.PhoenixInputSplit;
import org.apache.phoenix.mapreduce.PhoenixRecordReader;
import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PTable;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.math.BigDecimal;
import java.sql.*;
import java.util.HashMap;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
/**
* Created by admin on 1/3/18.
*/
public class HbaseSQLReaderTask {
private static Logger LOG = LoggerFactory.getLogger(HbaseSQLReaderTask.class);
private PhoenixInputFormat phoenixInputFormat;
PhoenixInputSplit phoenixInputSplit;
private PhoenixRecordReader phoenixRecordReader;
private Map<String, PColumn> pColumns;
private HbaseSQLReaderConfig readerConfig;
private TaskAttemptContextImpl hadoopAttemptContext;
public HbaseSQLReaderTask(Configuration config) {
this.readerConfig = HbaseSQLHelper.parseConfig(config);
pColumns = new LinkedHashMap<String, PColumn>();
}
private void getPColumns() throws SQLException {
Connection con =
DriverManager.getConnection(this.readerConfig.getConnectionString());
PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class);
MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection);
PTable table = metaDataClient.updateCache("", this.readerConfig.getTableName()).getTable();
List<String> columnNames = this.readerConfig.getColumns();
for (PColumn pColumn : table.getColumns()) {
if (columnNames.contains(pColumn.getName().getString())) {
pColumns.put(pColumn.getName().getString(), pColumn);
}
}
}
public void init() {
LOG.info("reader table info: " + this.readerConfig.toString());
try {
this.getPColumns();
} catch (SQLException e) {
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.GET_PHOENIX_CONNECTIONINFO_ERROR, "获取表的列出问题重试若还有问题请检查hbase集群状态,"+ e.getMessage());
}
this.phoenixInputFormat = new PhoenixInputFormat<PhoenixRecordWritable>();
String splitBase64Str = this.readerConfig.getOriginalConfig().getString(Key.SPLIT_KEY);
byte[] splitBytes = org.apache.commons.codec.binary.Base64.decodeBase64(splitBase64Str);
TaskAttemptID attemptId = new TaskAttemptID();
org.apache.hadoop.conf.Configuration conf = HbaseSQLHelper.generatePhoenixConf(this.readerConfig);
this.hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId);
this.phoenixInputSplit = new PhoenixInputSplit();
try {
HadoopSerializationUtil.deserialize(phoenixInputSplit, splitBytes);
this.phoenixRecordReader = (PhoenixRecordReader) phoenixInputFormat.createRecordReader(phoenixInputSplit, hadoopAttemptContext);
} catch (Exception e) {
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.PHOENIX_CREATEREADER_ERROR, "创建phoenix的reader出现问题,请重试若还有问题请检查hbase集群状态," + e.getMessage());
}
}
public void prepare() {
try {
this.phoenixRecordReader.initialize(this.phoenixInputSplit, hadoopAttemptContext);
} catch (IOException e) {
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.PHOENIX_READERINIT_ERROR, "phoenix的reader初始化出现问题,请重试若还有问题请检查hbase集群状态" + e.getMessage());
} catch (InterruptedException e) {
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.PHOENIX_READERINIT_ERROR, "phoenix的reader初始化被中断,请重试," + e.getMessage());
}
}
private Column convertPhoenixValueToDataxColumn(int sqlType, Object value) throws IOException {
Column column;
switch (sqlType) {
case Types.CHAR:
case Types.VARCHAR:
column = new StringColumn((String) value);
break;
case Types.BINARY:
case Types.VARBINARY:
column = new BytesColumn((byte[]) value);
break;
case Types.BOOLEAN:
column = new BoolColumn((Boolean) value);
break;
case Types.INTEGER:
column = new LongColumn((Integer) value);
break;
case Types.TINYINT:
column = new LongColumn(((Byte) value).longValue());
break;
case Types.SMALLINT:
column = new LongColumn(((Short) value).longValue());
break;
case Types.BIGINT:
column = new LongColumn((Long) value);
break;
case Types.FLOAT:
column = new DoubleColumn(((Float) value).doubleValue());
break;
case Types.DECIMAL:
column = new DoubleColumn(((BigDecimal) value));
break;
case Types.DOUBLE:
column = new DoubleColumn((Double) value);
break;
case Types.DATE:
column = new DateColumn((Date) value);
break;
case Types.TIME:
column = new DateColumn((Time) value);
break;
case Types.TIMESTAMP:
column = new DateColumn((Timestamp) value);
break;
default:
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.PHOENIX_COLUMN_TYPE_CONVERT_ERROR, "遇到不可识别的phoenix类型" + "sqlType :" + sqlType);
}
return column;
}
private void constructRecordFromPhoenix(Record record, Map<String, Object> phoenixRecord) throws IOException {
for (Map.Entry<String, PColumn> pColumnItem : this.pColumns.entrySet()) {
Column column = this.convertPhoenixValueToDataxColumn(
pColumnItem.getValue().getDataType().getSqlType(),
phoenixRecord.get(pColumnItem.getKey()));
record.addColumn(column);
}
}
public boolean readRecord(Record record) throws IOException, InterruptedException {
boolean hasNext = false;
hasNext = this.phoenixRecordReader.nextKeyValue();
if (!hasNext)
return hasNext;
PhoenixRecordWritable phoenixRecordWritable = (PhoenixRecordWritable) this.phoenixRecordReader.getCurrentValue();
Map<String, Object> phoenixRecord = phoenixRecordWritable.getResultMap();
this.constructRecordFromPhoenix(record, phoenixRecord);
return hasNext;
}
public void destroy() {
if (this.phoenixRecordReader != null) {
try {
this.phoenixRecordReader.close();
} catch (IOException e) {
throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.PHOENIX_READER_CLOSE_ERROR, "phoenix的reader close失败,请重试若还有问题请检查hbase集群状态" + e.getMessage());
}
}
}
}

View File

@ -0,0 +1,28 @@
package com.alibaba.datax.plugin.reader.hbase11xsqlreader;
import org.apache.hadoop.hbase.HConstants;
public final class Key {
public final static String MOCK_JOBID_IDENTIFIER = "phoenixreader";
public final static int MOCK_JOBID = 1;
public final static String SPLIT_KEY = "phoenixsplit";
/**
* 必选hbase集群配置连接一个hbase集群需要的最小配置只有两个zk和znode
*/
public final static String HBASE_CONFIG = "hbaseConfig";
public final static String HBASE_ZK_QUORUM = HConstants.ZOOKEEPER_QUORUM;
public final static String HBASE_ZNODE_PARENT = HConstants.ZOOKEEPER_ZNODE_PARENT;
/**
* 必选writer要写入的表的表名
*/
public final static String TABLE = "table";
/**
* 必选列配置
*/
public final static String COLUMN = "column";
}

View File

@ -0,0 +1,32 @@
errorcode.required_value=\u60A8\u7F3A\u5931\u4E86\u5FC5\u987B\u586B\u5199\u7684\u53C2\u6570\u503C.
errorcode.illegal_value=\u60A8\u586B\u5199\u7684\u53C2\u6570\u503C\u4E0D\u5408\u6CD5.
errorcode.get_phoenix_table_columns_error=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u9519.
errorcode.get_phoenix_connectioninfo_error=\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519.
errorcode.get_phoenix_splits_error=\u83B7\u53D6phoenix\u7684split\u4FE1\u606F\u65F6\u51FA\u9519.
errorcode.get_phoenix_createreader_error=\u521B\u5EFAphoenix\u7684split\u7684reader\u65F6\u51FA\u9519.
errorcode.get_phoenix_readerinit_error=phoenix\u7684split\u7684reader\u521D\u59CB\u5316\u65F6\u51FA\u9519.
errorcode.get_phoenix_column_typeconvert_error=\u5C06phoenix\u5217\u7684\u7C7B\u578B\u8F6C\u6362\u4E3Adatax\u7684\u7C7B\u578B\u65F6\u51FA\u9519.
errorcode.get_phoenix_record_read_error=\u8BFB\u53D6phoenix\u5177\u4F53\u7684\u4E00\u884C\u65F6\u51FA\u9519.
errorcode.get_phoenix_reader_close_error=\u5173\u95EDphoenix\u3000reader\u65F6\u51FA\u9519.
sqlhelper.1=\u901A\u8FC7zkURL\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38
sqlhelper.2=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u51FA\u73B0\u4E86\u5F02\u5E38\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38
sqlhelper.3=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u88AB\u4E2D\u65AD\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u8054\u7CFBdatax\u7BA1\u7406\u5458
sqlreadertask.1=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u95EE\u9898\uFF0C\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.2=\u521B\u5EFAphoenix\u7684reader\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.3=phoenix\u7684reader\u521D\u59CB\u5316\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.4=phoenix\u7684reader\u521D\u59CB\u5316\u88AB\u4E2D\u65AD,\u8BF7\u91CD\u8BD5
sqlreadertask.5=\u9047\u5230\u4E0D\u53EF\u8BC6\u522B\u7684phoenix\u7C7B\u578B\uFF0C\u8BF7\u8054\u7CFBhbase\u7BA1\u7406\u5458
sqlreadertask.6=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.7=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.8=phoenix\u7684reader close\u5931\u8D25,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
hbaseconfig.1=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u4E0D\u80FD\u4E3A\u7A7A
hbaseconfig.2=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u6709\u95EE\u9898\uFF0C\u8BF7\u53C2\u8003\u6587\u6863\u68C0\u67E5\u4E0B
hbaseconfig.3=zkquorum\u4E0D\u80FD\u4E3A\u7A7A
hbaseconfig.5=table\u7684\u540D\u5B57\u4E0D\u80FD\u4E3A\u7A7A
hbaseconfig.6=column\u53C2\u6570\u6CA1\u6709\u914D\u7F6E
hbaseconfig.7=\u4ECEphoenix\u83B7\u53D6column\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001

View File

@ -0,0 +1,32 @@
errorcode.required_value=\u60A8\u7F3A\u5931\u4E86\u5FC5\u987B\u586B\u5199\u7684\u53C2\u6570\u503C.
errorcode.illegal_value=\u60A8\u586B\u5199\u7684\u53C2\u6570\u503C\u4E0D\u5408\u6CD5.
errorcode.get_phoenix_table_columns_error=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u9519.
errorcode.get_phoenix_connectioninfo_error=\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519.
errorcode.get_phoenix_splits_error=\u83B7\u53D6phoenix\u7684split\u4FE1\u606F\u65F6\u51FA\u9519.
errorcode.get_phoenix_createreader_error=\u521B\u5EFAphoenix\u7684split\u7684reader\u65F6\u51FA\u9519.
errorcode.get_phoenix_readerinit_error=phoenix\u7684split\u7684reader\u521D\u59CB\u5316\u65F6\u51FA\u9519.
errorcode.get_phoenix_column_typeconvert_error=\u5C06phoenix\u5217\u7684\u7C7B\u578B\u8F6C\u6362\u4E3Adatax\u7684\u7C7B\u578B\u65F6\u51FA\u9519.
errorcode.get_phoenix_record_read_error=\u8BFB\u53D6phoenix\u5177\u4F53\u7684\u4E00\u884C\u65F6\u51FA\u9519.
errorcode.get_phoenix_reader_close_error=\u5173\u95EDphoenix\u3000reader\u65F6\u51FA\u9519.
sqlhelper.1=\u901A\u8FC7zkURL\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38
sqlhelper.2=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u51FA\u73B0\u4E86\u5F02\u5E38\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38
sqlhelper.3=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u88AB\u4E2D\u65AD\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u8054\u7CFBdatax\u7BA1\u7406\u5458
sqlreadertask.1=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u95EE\u9898\uFF0C\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.2=\u521B\u5EFAphoenix\u7684reader\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.3=phoenix\u7684reader\u521D\u59CB\u5316\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.4=phoenix\u7684reader\u521D\u59CB\u5316\u88AB\u4E2D\u65AD,\u8BF7\u91CD\u8BD5
sqlreadertask.5=\u9047\u5230\u4E0D\u53EF\u8BC6\u522B\u7684phoenix\u7C7B\u578B\uFF0C\u8BF7\u8054\u7CFBhbase\u7BA1\u7406\u5458
sqlreadertask.6=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.7=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.8=phoenix\u7684reader close\u5931\u8D25,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
hbaseconfig.1=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u4E0D\u80FD\u4E3A\u7A7A
hbaseconfig.2=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u6709\u95EE\u9898\uFF0C\u8BF7\u53C2\u8003\u6587\u6863\u68C0\u67E5\u4E0B
hbaseconfig.3=zkquorum\u4E0D\u80FD\u4E3A\u7A7A
hbaseconfig.5=table\u7684\u540D\u5B57\u4E0D\u80FD\u4E3A\u7A7A
hbaseconfig.6=column\u53C2\u6570\u6CA1\u6709\u914D\u7F6E
hbaseconfig.7=\u4ECEphoenix\u83B7\u53D6column\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001

View File

@ -0,0 +1,32 @@
errorcode.required_value=\u60A8\u7F3A\u5931\u4E86\u5FC5\u987B\u586B\u5199\u7684\u53C2\u6570\u503C.
errorcode.illegal_value=\u60A8\u586B\u5199\u7684\u53C2\u6570\u503C\u4E0D\u5408\u6CD5.
errorcode.get_phoenix_table_columns_error=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u9519.
errorcode.get_phoenix_connectioninfo_error=\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519.
errorcode.get_phoenix_splits_error=\u83B7\u53D6phoenix\u7684split\u4FE1\u606F\u65F6\u51FA\u9519.
errorcode.get_phoenix_createreader_error=\u521B\u5EFAphoenix\u7684split\u7684reader\u65F6\u51FA\u9519.
errorcode.get_phoenix_readerinit_error=phoenix\u7684split\u7684reader\u521D\u59CB\u5316\u65F6\u51FA\u9519.
errorcode.get_phoenix_column_typeconvert_error=\u5C06phoenix\u5217\u7684\u7C7B\u578B\u8F6C\u6362\u4E3Adatax\u7684\u7C7B\u578B\u65F6\u51FA\u9519.
errorcode.get_phoenix_record_read_error=\u8BFB\u53D6phoenix\u5177\u4F53\u7684\u4E00\u884C\u65F6\u51FA\u9519.
errorcode.get_phoenix_reader_close_error=\u5173\u95EDphoenix\u3000reader\u65F6\u51FA\u9519.
sqlhelper.1=\u901A\u8FC7zkURL\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38
sqlhelper.2=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u51FA\u73B0\u4E86\u5F02\u5E38\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38
sqlhelper.3=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u88AB\u4E2D\u65AD\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u8054\u7CFBdatax\u7BA1\u7406\u5458
sqlreadertask.1=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u95EE\u9898\uFF0C\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.2=\u521B\u5EFAphoenix\u7684reader\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.3=phoenix\u7684reader\u521D\u59CB\u5316\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.4=phoenix\u7684reader\u521D\u59CB\u5316\u88AB\u4E2D\u65AD,\u8BF7\u91CD\u8BD5
sqlreadertask.5=\u9047\u5230\u4E0D\u53EF\u8BC6\u522B\u7684phoenix\u7C7B\u578B\uFF0C\u8BF7\u8054\u7CFBhbase\u7BA1\u7406\u5458
sqlreadertask.6=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.7=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.8=phoenix\u7684reader close\u5931\u8D25,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
hbaseconfig.1=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u4E0D\u80FD\u4E3A\u7A7A
hbaseconfig.2=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u6709\u95EE\u9898\uFF0C\u8BF7\u53C2\u8003\u6587\u6863\u68C0\u67E5\u4E0B
hbaseconfig.3=zkquorum\u4E0D\u80FD\u4E3A\u7A7A
hbaseconfig.5=table\u7684\u540D\u5B57\u4E0D\u80FD\u4E3A\u7A7A
hbaseconfig.6=column\u53C2\u6570\u6CA1\u6709\u914D\u7F6E
hbaseconfig.7=\u4ECEphoenix\u83B7\u53D6column\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001

View File

@ -0,0 +1,32 @@
errorcode.required_value=\u60A8\u7F3A\u5931\u4E86\u5FC5\u987B\u586B\u5199\u7684\u53C2\u6570\u503C.
errorcode.illegal_value=\u60A8\u586B\u5199\u7684\u53C2\u6570\u503C\u4E0D\u5408\u6CD5.
errorcode.get_phoenix_table_columns_error=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u9519.
errorcode.get_phoenix_connectioninfo_error=\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519.
errorcode.get_phoenix_splits_error=\u83B7\u53D6phoenix\u7684split\u4FE1\u606F\u65F6\u51FA\u9519.
errorcode.get_phoenix_createreader_error=\u521B\u5EFAphoenix\u7684split\u7684reader\u65F6\u51FA\u9519.
errorcode.get_phoenix_readerinit_error=phoenix\u7684split\u7684reader\u521D\u59CB\u5316\u65F6\u51FA\u9519.
errorcode.get_phoenix_column_typeconvert_error=\u5C06phoenix\u5217\u7684\u7C7B\u578B\u8F6C\u6362\u4E3Adatax\u7684\u7C7B\u578B\u65F6\u51FA\u9519.
errorcode.get_phoenix_record_read_error=\u8BFB\u53D6phoenix\u5177\u4F53\u7684\u4E00\u884C\u65F6\u51FA\u9519.
errorcode.get_phoenix_reader_close_error=\u5173\u95EDphoenix\u3000reader\u65F6\u51FA\u9519.
sqlhelper.1=\u901A\u8FC7zkURL\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38
sqlhelper.2=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u51FA\u73B0\u4E86\u5F02\u5E38\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38
sqlhelper.3=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u88AB\u4E2D\u65AD\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u8054\u7CFBdatax\u7BA1\u7406\u5458
sqlreadertask.1=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u95EE\u9898\uFF0C\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.2=\u521B\u5EFAphoenix\u7684reader\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.3=phoenix\u7684reader\u521D\u59CB\u5316\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.4=phoenix\u7684reader\u521D\u59CB\u5316\u88AB\u4E2D\u65AD,\u8BF7\u91CD\u8BD5
sqlreadertask.5=\u9047\u5230\u4E0D\u53EF\u8BC6\u522B\u7684phoenix\u7C7B\u578B\uFF0C\u8BF7\u8054\u7CFBhbase\u7BA1\u7406\u5458
sqlreadertask.6=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.7=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
sqlreadertask.8=phoenix\u7684reader close\u5931\u8D25,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001
hbaseconfig.1=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u4E0D\u80FD\u4E3A\u7A7A
hbaseconfig.2=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u6709\u95EE\u9898\uFF0C\u8BF7\u53C2\u8003\u6587\u6863\u68C0\u67E5\u4E0B
hbaseconfig.3=zkquorum\u4E0D\u80FD\u4E3A\u7A7A
hbaseconfig.5=table\u7684\u540D\u5B57\u4E0D\u80FD\u4E3A\u7A7A
hbaseconfig.6=column\u53C2\u6570\u6CA1\u6709\u914D\u7F6E
hbaseconfig.7=\u4ECEphoenix\u83B7\u53D6column\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001

View File

@ -0,0 +1,7 @@
{
"name": "hbase11xsqlreader",
"class": "com.alibaba.datax.plugin.reader.hbase11xsqlreader.HbaseSQLReader",
"description": "useScene: prod. mechanism: Scan to read data.",
"developer": "liwei.li, bug reported to : liwei.li@alibaba-inc.com"
}

View File

@ -0,0 +1,13 @@
{
"name": "hbase11sqlreader",
"parameter": {
"hbaseConfig": {
"hbase.zookeeper.quorum": "hb-proxy-pub-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-003.hbase.rds.aliyuncs.com"
},
"table": "TABLE1",
"column": [
"ID",
"COL1"
]
}
}

View File

@ -0,0 +1,40 @@
package com.alibaba.datax.plugin.reader.hbase11xsqlreader;
import com.alibaba.datax.common.util.Configuration;
import org.junit.Test;
import java.util.List;
import static junit.framework.Assert.assertEquals;
/**
* Created by shf on 16/7/20.
*/
public class HbaseSQLHelperTest {
private String jsonStr = "{\n" +
" \"hbaseConfig\": {\n" +
" \"hbase.zookeeper.quorum\": \"hb-proxy-pub-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-003.hbase.rds.aliyuncs.com\"\n" +
" },\n" +
" \"table\": \"TABLE1\",\n" +
" \"column\": []\n" +
" }";
@Test
public void testParseConfig() {
Configuration config = Configuration.from(jsonStr);
HbaseSQLReaderConfig readerConfig = HbaseSQLHelper.parseConfig(config);
System.out.println("tablenae = " +readerConfig.getTableName() +",zk = " +readerConfig.getZkUrl());
assertEquals("TABLE1", readerConfig.getTableName());
assertEquals("hb-proxy-pub-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-003.hbase.rds.aliyuncs.com:2181", readerConfig.getZkUrl());
}
@Test
public void testSplit() {
Configuration config = Configuration.from(jsonStr);
HbaseSQLReaderConfig readerConfig = HbaseSQLHelper.parseConfig(config);
List<Configuration> splits = HbaseSQLHelper.split(readerConfig);
System.out.println("split size = " + splits.size());
}
}

View File

@ -0,0 +1,78 @@
package com.alibaba.datax.plugin.reader.hbase11xsqlreader;
import com.alibaba.datax.common.element.*;
import com.alibaba.datax.common.plugin.RecordSender;
import com.alibaba.datax.common.util.Configuration;
import com.alibaba.datax.core.transport.record.DefaultRecord;
import org.junit.Test;
import java.io.IOException;
import java.util.List;
import static junit.framework.Assert.assertEquals;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
public class HbaseSQLReaderTaskTest {
private String jsonStr = "{\n" +
" \"hbaseConfig\": {\n" +
" \"hbase.zookeeper.quorum\": \"hb-proxy-pub-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-003.hbase.rds.aliyuncs.com\"\n" +
" },\n" +
" \"table\": \"TABLE1\",\n" +
" \"column\": []\n" +
" }";
private List<Configuration> generateSplitConfig() throws IOException, InterruptedException {
Configuration config = Configuration.from(jsonStr);
HbaseSQLReaderConfig readerConfig = HbaseSQLHelper.parseConfig(config);
List<Configuration> splits = HbaseSQLHelper.split(readerConfig);
System.out.println("split size = " + splits.size());
return splits;
}
@Test
public void testReadRecord() throws Exception {
List<Configuration> splits = this.generateSplitConfig();
int allRecordNum = 0;
for (int i = 0; i < splits.size(); i++) {
RecordSender recordSender = mock(RecordSender.class);
when(recordSender.createRecord()).thenReturn(new DefaultRecord());
Record record = recordSender.createRecord();
HbaseSQLReaderTask hbase11SQLReaderTask = new HbaseSQLReaderTask(splits.get(i));
hbase11SQLReaderTask.init();
hbase11SQLReaderTask.prepare();
int num = 0;
while (true) {
boolean hasLine = false;
try {
hasLine = hbase11SQLReaderTask.readRecord(record);
} catch (Exception e) {
e.printStackTrace();
throw e;
}
if (!hasLine)
break;
num++;
if (num % 100 == 0)
System.out.println("record num is :" + num + ",record is " + record.toString());
when(recordSender.createRecord()).thenReturn(new DefaultRecord());
String recordStr = "";
for (int j = 0; j < record.getColumnNumber(); j++) {
recordStr += record.getColumn(j).asString() + ",";
}
recordSender.sendToWriter(record);
record = recordSender.createRecord();
}
System.out.println("split id is " + i + ",record num = " + num);
allRecordNum += num;
recordSender.flush();
hbase11SQLReaderTask.destroy();
}
System.out.println("all record num = " + allRecordNum);
assertEquals(10000, allRecordNum);
}
}

View File

@ -294,6 +294,13 @@
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>hbase11xsqlreader/target/datax/</directory>
<includes>
<include>**/*.*</include>
</includes>
<outputDirectory>datax</outputDirectory>
</fileSet>
<fileSet>
<directory>elasticsearchwriter/target/datax/</directory>
<includes>

View File

@ -83,6 +83,7 @@
<module>hbase11xwriter</module>
<module>hbase094xwriter</module>
<module>hbase11xsqlwriter</module>
<module>hbase11xsqlreader</module>
<module>elasticsearchwriter</module>
<!-- common support module -->