From b901b62de2fc010bed264a43267a5fe3723b329f Mon Sep 17 00:00:00 2001 From: muyuan Date: Wed, 7 Nov 2018 20:18:07 +0800 Subject: [PATCH] add hbase11xsqlreader --- README.md | 1 + hbase11xsqlreader/doc/hbase11xsqlreader.md | 252 ++++++++++++++++++ hbase11xsqlreader/pom.xml | 109 ++++++++ .../src/main/assembly/package.xml | 35 +++ .../HadoopSerializationUtil.java | 31 +++ .../hbase11xsqlreader/HbaseSQLHelper.java | 125 +++++++++ .../hbase11xsqlreader/HbaseSQLReader.java | 89 +++++++ .../HbaseSQLReaderConfig.java | 164 ++++++++++++ .../HbaseSQLReaderErrorCode.java | 40 +++ .../hbase11xsqlreader/HbaseSQLReaderTask.java | 178 +++++++++++++ .../plugin/reader/hbase11xsqlreader/Key.java | 28 ++ .../hbase11xsqlreader/LocalStrings.properties | 32 +++ .../LocalStrings_en_US.properties | 32 +++ .../LocalStrings_ja_JP.properties | 32 +++ .../LocalStrings_zh_CN.properties | 32 +++ .../src/main/resources/plugin.json | 7 + .../main/resources/plugin_job_template.json | 13 + .../hbase11xsqlreader/HbaseSQLHelperTest.java | 40 +++ .../HbaseSQLReaderTaskTest.java | 78 ++++++ package.xml | 7 + pom.xml | 1 + 21 files changed, 1326 insertions(+) create mode 100644 hbase11xsqlreader/doc/hbase11xsqlreader.md create mode 100644 hbase11xsqlreader/pom.xml create mode 100644 hbase11xsqlreader/src/main/assembly/package.xml create mode 100644 hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HadoopSerializationUtil.java create mode 100644 hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java create mode 100644 hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReader.java create mode 100644 hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java create mode 100644 hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderErrorCode.java create mode 100644 hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java create mode 100644 hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java create mode 100644 hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings.properties create mode 100644 hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_en_US.properties create mode 100644 hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_ja_JP.properties create mode 100644 hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_zh_CN.properties create mode 100644 hbase11xsqlreader/src/main/resources/plugin.json create mode 100644 hbase11xsqlreader/src/main/resources/plugin_job_template.json create mode 100644 hbase11xsqlreader/src/test/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelperTest.java create mode 100644 hbase11xsqlreader/src/test/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTaskTest.java diff --git a/README.md b/README.md index 60370693..f12bb021 100644 --- a/README.md +++ b/README.md @@ -47,6 +47,7 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N | NoSQL数据存储 | OTS | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md)| | | Hbase0.94 | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md)| | | Hbase1.1 | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md)| +| | Phoenix4.x | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md)| | | MongoDB | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/mongoreader/doc/mongoreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongowriter/doc/mongowriter.md)| | | Hive | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md)| | 无结构化数据存储 | TxtFile | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md)| diff --git a/hbase11xsqlreader/doc/hbase11xsqlreader.md b/hbase11xsqlreader/doc/hbase11xsqlreader.md new file mode 100644 index 00000000..03261a1f --- /dev/null +++ b/hbase11xsqlreader/doc/hbase11xsqlreader.md @@ -0,0 +1,252 @@ +# hbase11xsqlreader 插件文档 + + +___ + + + +## 1 快速介绍 + +hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实现上,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并执行相应的sql语句将数据从Phoenix库中SELECT出来。 + + +## 2 实现原理 + +简而言之,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并根据用户配置的信息生成查询SELECT 语句,然后发送到HBase集群,并将返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 +# hbase11xsqlreader 插件文档 + + +___ + + + +## 1 快速介绍 + +hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实现上,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并执行相应的sql语句将数据从Phoenix库中SELECT出来。 + + +## 2 实现原理 + +简而言之,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并根据用户配置的信息生成查询SELECT 语句,然后发送到HBase集群,并将返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。 + + +## 3 功能说明 + +### 3.1 配置样例 + +* 配置一个从Phoenix同步抽取数据到本地的作业: + +``` +{ + "job": { + "setting": { + "speed": { + //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它. + "byte":10485760 + }, + //出错限制 + "errorLimit": { + //出错的record条数上限,当大于该值即报错。 + "record": 0, + //出错的record百分比上限 1.0表示100%,0.02表示2% + "percentage": 0.02 + } + }, + "content": [ { + "reader": { + //指定插件为hbase11xsqlreader + "name": "hbase11xsqlreader", + "parameter": { + //填写连接Phoenix的hbase集群zk地址 + "hbaseConfig": { + "hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com" + }, + //填写要读取的phoenix的表名 + "table": "US_POPULATION", + //填写要读取的列名,不填读取所有列 + "column": [ + ] + } + }, + "writer": { + //writer类型 + "name": "streamwriter", + //是否打印内容 + "parameter": { + "print":true, + "encoding": "UTF-8" + } + } + } + ] + } +} +``` + + +### 3.2 参数说明 + +* **hbaseConfig** + + * 描述:hbase11xsqlreader需要通过Phoenix客户端去连接hbase集群,因此这里需要填写对应hbase集群的zkurl地址,注意不要添加2181。 + + * 必选:是
+ + * 默认值:无
+ +* **table** + + * 描述:编写Phoenix中的表名,如果有namespace,该值设置为'namespace.tablename' + + * 必选:是
+ + * 默认值:无
+ +* **column** + + * 描述:填写需要从phoenix表中读取的列名集合,使用JSON的数组描述字段信息,空值表示读取所有列。 + + * 必选:是
+ + * 默认值:无
+ + +### 3.3 类型转换 + +目前hbase11xsqlreader支持大部分Phoenix类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。 + +下面列出MysqlReader针对Mysql类型转换列表: + + +| DataX 内部类型| Phoenix 数据类型 | +| -------- | ----- | +| String |CHAR, VARCHAR| +| Bytes |BINARY, VARBINARY| +| Bool |BOOLEAN | +| Long |INTEGER, TINYINT, SMALLINT, BIGINT | +| Double |FLOAT, DECIMAL, DOUBLE, | +| Date |DATE, TIME, TIMESTAMP | + + + +## 4 性能报告 + +略 + +## 5 约束限制 +略 +## 6 FAQ + +*** + + + +## 3 功能说明 + +### 3.1 配置样例 + +* 配置一个从Phoenix同步抽取数据到本地的作业: + +``` +{ + "job": { + "setting": { + "speed": { + //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它. + "byte":10485760 + }, + //出错限制 + "errorLimit": { + //出错的record条数上限,当大于该值即报错。 + "record": 0, + //出错的record百分比上限 1.0表示100%,0.02表示2% + "percentage": 0.02 + } + }, + "content": [ { + "reader": { + //指定插件为hbase11xsqlreader + "name": "hbase11xsqlreader", + "parameter": { + //填写连接Phoenix的hbase集群zk地址 + "hbaseConfig": { + "hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com" + }, + //填写要读取的phoenix的表名 + "table": "US_POPULATION", + //填写要读取的列名,不填读取所有列 + "column": [ + ] + } + }, + "writer": { + //writer类型 + "name": "streamwriter", + //是否打印内容 + "parameter": { + "print":true, + "encoding": "UTF-8" + } + } + } + ] + } +} +``` + + +### 3.2 参数说明 + +* **hbaseConfig** + + * 描述:hbase11xsqlreader需要通过Phoenix客户端去连接hbase集群,因此这里需要填写对应hbase集群的zkurl地址,注意不要添加2181。 + + * 必选:是
+ + * 默认值:无
+ +* **table** + + * 描述:编写Phoenix中的表名,如果有namespace,该值设置为'namespace.tablename' + + * 必选:是
+ + * 默认值:无
+ +* **column** + + * 描述:填写需要从phoenix表中读取的列名集合,使用JSON的数组描述字段信息,空值表示读取所有列。 + + * 必选:是
+ + * 默认值:无
+ + +### 3.3 类型转换 + +目前hbase11xsqlreader支持大部分Phoenix类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。 + +下面列出MysqlReader针对Mysql类型转换列表: + + +| DataX 内部类型| Phoenix 数据类型 | +| -------- | ----- | +| String |CHAR, VARCHAR| +| Bytes |BINARY, VARBINARY| +| Bool |BOOLEAN | +| Long |INTEGER, TINYINT, SMALLINT, BIGINT | +| Double |FLOAT, DECIMAL, DOUBLE, | +| Date |DATE, TIME, TIMESTAMP | + + + +## 4 性能报告 + +略 + +## 5 约束限制 +略 +## 6 FAQ + +*** + diff --git a/hbase11xsqlreader/pom.xml b/hbase11xsqlreader/pom.xml new file mode 100644 index 00000000..6ea727c0 --- /dev/null +++ b/hbase11xsqlreader/pom.xml @@ -0,0 +1,109 @@ + + 4.0.0 + + com.alibaba.datax + datax-all + 0.0.1-SNAPSHOT + + hbase11xsqlreader + hbase11xsqlreader + 0.0.1-SNAPSHOT + jar + + + 4.12.0-AliHBase-1.1-0.5 + + + + + com.alibaba.datax + datax-common + ${datax-project-version} + + + slf4j-log4j12 + org.slf4j + + + + + + com.aliyun.phoenix + ali-phoenix-core + ${phoenix.version} + + + servlet-api + javax.servlet + + + + + + + junit + junit + test + + + org.mockito + mockito-core + 2.0.44-beta + test + + + com.alibaba.datax + datax-core + ${datax-project-version} + + + com.alibaba.datax + datax-service-face + + + test + + + + + + + src/main/java + + **/*.properties + + + + + + + maven-compiler-plugin + + 1.6 + 1.6 + ${project-sourceEncoding} + + + + maven-assembly-plugin + + + src/main/assembly/package.xml + + datax + + + + dwzip + package + + single + + + + + + + + diff --git a/hbase11xsqlreader/src/main/assembly/package.xml b/hbase11xsqlreader/src/main/assembly/package.xml new file mode 100644 index 00000000..b5c1e19b --- /dev/null +++ b/hbase11xsqlreader/src/main/assembly/package.xml @@ -0,0 +1,35 @@ + + + + dir + + false + + + src/main/resources + + plugin.json + plugin_job_template.json + + plugin/reader/hbase11xsqlreader + + + target/ + + hbase11xsqlreader-0.0.1-SNAPSHOT.jar + + plugin/reader/hbase11xsqlreader + + + + + + false + plugin/reader/hbase11xsqlreader/libs + runtime + + + diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HadoopSerializationUtil.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HadoopSerializationUtil.java new file mode 100644 index 00000000..45907123 --- /dev/null +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HadoopSerializationUtil.java @@ -0,0 +1,31 @@ +package com.alibaba.datax.plugin.reader.hbase11xsqlreader; + + +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; + +public class HadoopSerializationUtil { + + public static byte[] serialize(Writable writable) throws IOException { + ByteArrayOutputStream out = new ByteArrayOutputStream(); + DataOutputStream dataout = new DataOutputStream(out); + writable.write(dataout); + dataout.close(); + return out.toByteArray(); + } + + public static void deserialize(Writable writable, byte[] bytes) throws Exception { + + ByteArrayInputStream in = new ByteArrayInputStream(bytes); + DataInputStream datain = new DataInputStream(in); + writable.readFields(datain); + datain.close(); + } + + +} \ No newline at end of file diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java new file mode 100644 index 00000000..2aacdddf --- /dev/null +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java @@ -0,0 +1,125 @@ +package com.alibaba.datax.plugin.reader.hbase11xsqlreader; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.fastjson.JSON; +import com.alibaba.fastjson.TypeReference; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.util.Pair; +import org.apache.hadoop.mapreduce.InputSplit; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.task.JobContextImpl; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver; +import org.apache.phoenix.mapreduce.PhoenixInputFormat; +import org.apache.phoenix.mapreduce.PhoenixInputSplit; +import org.apache.phoenix.mapreduce.PhoenixRecordWritable; +import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.SaltingUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DriverManager; +import java.sql.SQLException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; + + +public class HbaseSQLHelper { + private static final Logger LOG = LoggerFactory.getLogger(HbaseSQLHelper.class); + + public static org.apache.hadoop.conf.Configuration generatePhoenixConf(HbaseSQLReaderConfig readerConfig) { + org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration(); + + String table = readerConfig.getTableName(); + List columns = readerConfig.getColumns(); + String zkUrl = readerConfig.getZkUrl(); + + PhoenixConfigurationUtil.setInputClass(conf, PhoenixRecordWritable.class); + PhoenixConfigurationUtil.setInputTableName(conf, table); + + if (!columns.isEmpty()) { + PhoenixConfigurationUtil.setSelectColumnNames(conf, columns.toArray(new String[columns.size()])); + } + PhoenixEmbeddedDriver.ConnectionInfo info = null; + try { + info = PhoenixEmbeddedDriver.ConnectionInfo.create(zkUrl); + } catch (SQLException e) { + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.GET_PHOENIX_CONNECTIONINFO_ERROR, "通过zkURL获取phoenix的connectioninfo出错,请检查hbase集群服务是否正常", e); + } + conf.set(HConstants.ZOOKEEPER_QUORUM, info.getZookeeperQuorum()); + if (info.getPort() != null) + conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, info.getPort()); + if (info.getRootNode() != null) + conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, info.getRootNode()); + return conf; + } + + public static List getPColumnNames(String connectionString, String tableName) throws SQLException { + Connection con = + DriverManager.getConnection(connectionString); + PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class); + MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection); + PTable table = metaDataClient.updateCache("", tableName).getTable(); + List columnNames = new ArrayList(); + for (PColumn pColumn : table.getColumns()) { + if (!pColumn.getName().getString().equals(SaltingUtil.SALTING_COLUMN_NAME)) + columnNames.add(pColumn.getName().getString()); + else + LOG.info(tableName + " is salt table"); + } + return columnNames; + } + + + public static List split(HbaseSQLReaderConfig readerConfig) { + PhoenixInputFormat inputFormat = new PhoenixInputFormat(); + org.apache.hadoop.conf.Configuration conf = generatePhoenixConf(readerConfig); + JobID jobId = new JobID(Key.MOCK_JOBID_IDENTIFIER, Key.MOCK_JOBID); + JobContextImpl jobContext = new JobContextImpl(conf, jobId); + List resultConfigurations = new ArrayList(); + List rawSplits = null; + try { + rawSplits = inputFormat.getSplits(jobContext); + LOG.info("split size is " + rawSplits.size()); + for (InputSplit split : rawSplits) { + Configuration cfg = readerConfig.getOriginalConfig().clone(); + + byte[] splitSer = HadoopSerializationUtil.serialize((PhoenixInputSplit) split); + String splitBase64Str = org.apache.commons.codec.binary.Base64.encodeBase64String(splitSer); + cfg.set(Key.SPLIT_KEY, splitBase64Str); + resultConfigurations.add(cfg); + } + } catch (IOException e) { + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.GET_PHOENIX_SPLITS_ERROR, "获取表的split信息时出现了异常,请检查hbase集群服务是否正常," + e.getMessage(), e); + } catch (InterruptedException e) { + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.GET_PHOENIX_SPLITS_ERROR, "获取表的split信息时被中断,请重试,若还有问题请联系datax管理员," + e.getMessage(), e); + } + + return resultConfigurations; + } + + public static HbaseSQLReaderConfig parseConfig(Configuration cfg) { + return HbaseSQLReaderConfig.parse(cfg); + } + + public static Pair getHbaseConfig(String hbaseCfgString) { + assert hbaseCfgString != null; + Map hbaseConfigMap = JSON.parseObject(hbaseCfgString, new TypeReference>() { + }); + String zkQuorum = hbaseConfigMap.get(Key.HBASE_ZK_QUORUM); + String znode = hbaseConfigMap.get(Key.HBASE_ZNODE_PARENT); + if(znode == null) + znode = ""; + return new Pair(zkQuorum, znode); + } +} diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReader.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReader.java new file mode 100644 index 00000000..c53e77fb --- /dev/null +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReader.java @@ -0,0 +1,89 @@ +package com.alibaba.datax.plugin.reader.hbase11xsqlreader; + +import com.alibaba.datax.common.element.*; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.spi.Reader; +import com.alibaba.datax.common.util.Configuration; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.List; + +public class HbaseSQLReader extends Reader { + public static class Job extends Reader.Job { + private HbaseSQLReaderConfig readerConfig; + + @Override + public void init() { + readerConfig = HbaseSQLHelper.parseConfig(this.getPluginJobConf()); + } + + @Override + public List split(int adviceNumber) { + return HbaseSQLHelper.split(readerConfig); + } + + + @Override + public void destroy() { + + } + + } + + public static class Task extends Reader.Task { + private static Logger LOG = LoggerFactory.getLogger(Task.class); + private HbaseSQLReaderTask hbase11SQLReaderTask; + + @Override + public void init() { + hbase11SQLReaderTask = new HbaseSQLReaderTask(this.getPluginJobConf()); + this.hbase11SQLReaderTask.init(); + } + + @Override + public void prepare() { + hbase11SQLReaderTask.prepare(); + } + + + @Override + public void startRead(RecordSender recordSender) { + Long recordNum = 0L; + Record record = recordSender.createRecord(); + boolean fetchOK; + while (true) { + try { + fetchOK = this.hbase11SQLReaderTask.readRecord(record); + } catch (Exception e) { + LOG.info("Read record exception", e); + e.printStackTrace(); + super.getTaskPluginCollector().collectDirtyRecord(record, e); + record = recordSender.createRecord(); + continue; + } + if (fetchOK) { + recordSender.sendToWriter(record); + recordNum++; + if (recordNum % 10000 == 0) + LOG.info("already read record num is " + recordNum); + record = recordSender.createRecord(); + } else { + break; + } + } + recordSender.flush(); + } + + @Override + public void post() { + super.post(); + } + + @Override + public void destroy() { + this.hbase11SQLReaderTask.destroy(); + } + } + +} diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java new file mode 100644 index 00000000..ab06f6e1 --- /dev/null +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java @@ -0,0 +1,164 @@ +package com.alibaba.datax.plugin.reader.hbase11xsqlreader; + +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.hbase.util.Pair; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.sql.SQLException; +import java.util.List; + +public class HbaseSQLReaderConfig { + private final static Logger LOG = LoggerFactory.getLogger(HbaseSQLReaderConfig.class); + private Configuration originalConfig; // 原始的配置数据 + + // 集群配置 + private String connectionString; + + public String getZkUrl() { + return zkUrl; + } + + private String zkUrl; + + // 表配置 + private String tableName; + private List columns; // 目的表的所有列的列名,包括主键和非主键,不包括时间列 + + /** + * @return 获取原始的datax配置 + */ + public Configuration getOriginalConfig() { + return originalConfig; + } + + /** + * @return 获取连接字符串,使用ZK模式 + */ + public String getConnectionString() { + return connectionString; + } + + /** + * @return 获取表名 + */ + public String getTableName() { + return tableName; + } + + /** + * @return 返回所有的列,包括主键列和非主键列,但不包括version列 + */ + public List getColumns() { + return columns; + } + + /** + * @param dataxCfg + * @return + */ + public static HbaseSQLReaderConfig parse(Configuration dataxCfg) { + assert dataxCfg != null; + HbaseSQLReaderConfig cfg = new HbaseSQLReaderConfig(); + cfg.originalConfig = dataxCfg; + + // 1. 解析集群配置 + parseClusterConfig(cfg, dataxCfg); + + // 2. 解析列配置 + parseTableConfig(cfg, dataxCfg); + + // 4. 打印解析出来的配置 + LOG.info("HBase SQL reader config parsed:" + cfg.toString()); + + return cfg; + } + + private static void parseClusterConfig(HbaseSQLReaderConfig cfg, Configuration dataxCfg) { + // 获取hbase集群的连接信息字符串 + String hbaseCfg = dataxCfg.getString(Key.HBASE_CONFIG); + if (StringUtils.isBlank(hbaseCfg)) { + // 集群配置必须存在且不为空 + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.REQUIRED_VALUE, "读 Hbase 时需要配置hbaseConfig,其内容为 Hbase 连接信息,请查看 Hbase 集群信息."); + } + + // 解析zk服务器和znode信息 + Pair zkCfg; + try { + zkCfg = HbaseSQLHelper.getHbaseConfig(hbaseCfg); + } catch (Throwable t) { + // 解析hbase配置错误 + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.REQUIRED_VALUE, "解析hbaseConfig出错,请确认您配置的hbaseConfig为合法的json数据格式,内容正确." ); + } + String zkQuorum = zkCfg.getFirst(); + String znode = zkCfg.getSecond(); + if (zkQuorum == null || zkQuorum.isEmpty()) { + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "HBase的hbase.zookeeper.quorum配置不能为空" ); + } + // 生成sql使用的连接字符串, 格式: jdbc:hbase:zk_quorum:2181:/znode_parent + cfg.connectionString = "jdbc:phoenix:" + zkQuorum; + cfg.zkUrl = zkQuorum + ":2181"; + if (!znode.isEmpty()) { + cfg.connectionString += cfg.connectionString + ":" + znode; + cfg.zkUrl += cfg.zkUrl + ":" + znode; + } + } + + private static void parseTableConfig(HbaseSQLReaderConfig cfg, Configuration dataxCfg) { + // 解析并检查表名 + cfg.tableName = dataxCfg.getString(Key.TABLE); + if (cfg.tableName == null || cfg.tableName.isEmpty()) { + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "HBase的tableName配置不能为空,请检查并修改配置." ); + } + + // 解析列配置,列为空时,补全所有的列 + cfg.columns = dataxCfg.getList(Key.COLUMN, String.class); + if (cfg.columns == null) { + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "您配置的tableName含有非法字符{0},请检查您的配置."); + } else if (cfg.columns.isEmpty()) { + try { + cfg.columns = HbaseSQLHelper.getPColumnNames(cfg.connectionString, cfg.tableName); + dataxCfg.set(Key.COLUMN, cfg.columns); + } catch (SQLException e) { + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.GET_PHOENIX_COLUMN_ERROR, "HBase的columns配置不能为空,请添加目标表的列名配置." + e.getMessage(), e); + } + } + } + + @Override + public String toString() { + StringBuilder ret = new StringBuilder(); + // 集群配置 + ret.append("\n[jdbc]"); + ret.append(connectionString); + ret.append("\n"); + + // 表配置 + ret.append("[tableName]"); + ret.append(tableName); + ret.append("\n"); + ret.append("[column]"); + for (String col : columns) { + ret.append(col); + ret.append(","); + } + ret.setLength(ret.length() - 1); + ret.append("\n"); + + return ret.toString(); + } + + /** + * 禁止直接实例化本类,必须调用{@link #parse}接口来初始化 + */ + private HbaseSQLReaderConfig() { + } +} diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderErrorCode.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderErrorCode.java new file mode 100644 index 00000000..a0623364 --- /dev/null +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderErrorCode.java @@ -0,0 +1,40 @@ +package com.alibaba.datax.plugin.reader.hbase11xsqlreader; + +import com.alibaba.datax.common.spi.ErrorCode; + +public enum HbaseSQLReaderErrorCode implements ErrorCode { + REQUIRED_VALUE("Hbasewriter-00", "您缺失了必须填写的参数值."), + ILLEGAL_VALUE("Hbasewriter-01", "您填写的参数值不合法."), + GET_PHOENIX_COLUMN_ERROR("Hbasewriter-02", "获取phoenix表的列值错误"), + GET_PHOENIX_CONNECTIONINFO_ERROR("Hbasewriter-03", "获取phoenix服务的zkurl错误"), + GET_PHOENIX_SPLITS_ERROR("Hbasewriter-04", "获取phoenix的split信息错误"), + PHOENIX_CREATEREADER_ERROR("Hbasewriter-05", "获取phoenix的reader错误"), + PHOENIX_READERINIT_ERROR("Hbasewriter-06", "phoenix reader的初始化错误"), + PHOENIX_COLUMN_TYPE_CONVERT_ERROR("Hbasewriter-07", "phoenix的列类型转换错误"), + PHOENIX_RECORD_READ_ERROR("Hbasewriter-08", "phoenix record 读取错误"), + PHOENIX_READER_CLOSE_ERROR("Hbasewriter-09", "phoenix reader 的close错误") + ; + + private final String code; + private final String description; + + private HbaseSQLReaderErrorCode(String code, String description) { + this.code = code; + this.description = description; + } + + @Override + public String getCode() { + return this.code; + } + + @Override + public String getDescription() { + return this.description; + } + + @Override + public String toString() { + return String.format("Code:[%s], Description:[%s].", this.code, this.description); + } +} diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java new file mode 100644 index 00000000..1ca22c6f --- /dev/null +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java @@ -0,0 +1,178 @@ +package com.alibaba.datax.plugin.reader.hbase11xsqlreader; + +import com.alibaba.datax.common.element.*; +import com.alibaba.datax.common.exception.DataXException; +import com.alibaba.datax.common.util.Configuration; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.mapreduce.PhoenixInputFormat; +import org.apache.phoenix.mapreduce.PhoenixInputSplit; +import org.apache.phoenix.mapreduce.PhoenixRecordReader; +import org.apache.phoenix.mapreduce.PhoenixRecordWritable; +import org.apache.phoenix.schema.MetaDataClient; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PTable; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.math.BigDecimal; +import java.sql.*; +import java.util.HashMap; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; + +/** + * Created by admin on 1/3/18. + */ +public class HbaseSQLReaderTask { + private static Logger LOG = LoggerFactory.getLogger(HbaseSQLReaderTask.class); + private PhoenixInputFormat phoenixInputFormat; + PhoenixInputSplit phoenixInputSplit; + private PhoenixRecordReader phoenixRecordReader; + private Map pColumns; + private HbaseSQLReaderConfig readerConfig; + private TaskAttemptContextImpl hadoopAttemptContext; + + public HbaseSQLReaderTask(Configuration config) { + this.readerConfig = HbaseSQLHelper.parseConfig(config); + pColumns = new LinkedHashMap(); + } + + private void getPColumns() throws SQLException { + Connection con = + DriverManager.getConnection(this.readerConfig.getConnectionString()); + PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class); + MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection); + PTable table = metaDataClient.updateCache("", this.readerConfig.getTableName()).getTable(); + List columnNames = this.readerConfig.getColumns(); + for (PColumn pColumn : table.getColumns()) { + if (columnNames.contains(pColumn.getName().getString())) { + pColumns.put(pColumn.getName().getString(), pColumn); + } + } + } + + public void init() { + LOG.info("reader table info: " + this.readerConfig.toString()); + try { + this.getPColumns(); + } catch (SQLException e) { + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.GET_PHOENIX_CONNECTIONINFO_ERROR, "获取表的列出问题,重试,若还有问题请检查hbase集群状态,"+ e.getMessage()); + } + this.phoenixInputFormat = new PhoenixInputFormat(); + String splitBase64Str = this.readerConfig.getOriginalConfig().getString(Key.SPLIT_KEY); + byte[] splitBytes = org.apache.commons.codec.binary.Base64.decodeBase64(splitBase64Str); + TaskAttemptID attemptId = new TaskAttemptID(); + org.apache.hadoop.conf.Configuration conf = HbaseSQLHelper.generatePhoenixConf(this.readerConfig); + this.hadoopAttemptContext = new TaskAttemptContextImpl(conf, attemptId); + this.phoenixInputSplit = new PhoenixInputSplit(); + try { + HadoopSerializationUtil.deserialize(phoenixInputSplit, splitBytes); + this.phoenixRecordReader = (PhoenixRecordReader) phoenixInputFormat.createRecordReader(phoenixInputSplit, hadoopAttemptContext); + } catch (Exception e) { + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.PHOENIX_CREATEREADER_ERROR, "创建phoenix的reader出现问题,请重试,若还有问题请检查hbase集群状态," + e.getMessage()); + } + } + + public void prepare() { + try { + this.phoenixRecordReader.initialize(this.phoenixInputSplit, hadoopAttemptContext); + } catch (IOException e) { + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.PHOENIX_READERINIT_ERROR, "phoenix的reader初始化出现问题,请重试,若还有问题请检查hbase集群状态" + e.getMessage()); + } catch (InterruptedException e) { + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.PHOENIX_READERINIT_ERROR, "phoenix的reader初始化被中断,请重试," + e.getMessage()); + } + } + + + private Column convertPhoenixValueToDataxColumn(int sqlType, Object value) throws IOException { + Column column; + switch (sqlType) { + case Types.CHAR: + case Types.VARCHAR: + column = new StringColumn((String) value); + break; + case Types.BINARY: + case Types.VARBINARY: + column = new BytesColumn((byte[]) value); + break; + case Types.BOOLEAN: + column = new BoolColumn((Boolean) value); + break; + case Types.INTEGER: + column = new LongColumn((Integer) value); + break; + case Types.TINYINT: + column = new LongColumn(((Byte) value).longValue()); + break; + case Types.SMALLINT: + column = new LongColumn(((Short) value).longValue()); + break; + case Types.BIGINT: + column = new LongColumn((Long) value); + break; + case Types.FLOAT: + column = new DoubleColumn(((Float) value).doubleValue()); + break; + case Types.DECIMAL: + column = new DoubleColumn(((BigDecimal) value)); + break; + case Types.DOUBLE: + column = new DoubleColumn((Double) value); + break; + case Types.DATE: + column = new DateColumn((Date) value); + break; + case Types.TIME: + column = new DateColumn((Time) value); + break; + case Types.TIMESTAMP: + column = new DateColumn((Timestamp) value); + break; + default: + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.PHOENIX_COLUMN_TYPE_CONVERT_ERROR, "遇到不可识别的phoenix类型," + "sqlType :" + sqlType); + } + return column; + } + + private void constructRecordFromPhoenix(Record record, Map phoenixRecord) throws IOException { + for (Map.Entry pColumnItem : this.pColumns.entrySet()) { + Column column = this.convertPhoenixValueToDataxColumn( + pColumnItem.getValue().getDataType().getSqlType(), + phoenixRecord.get(pColumnItem.getKey())); + record.addColumn(column); + } + } + + public boolean readRecord(Record record) throws IOException, InterruptedException { + boolean hasNext = false; + hasNext = this.phoenixRecordReader.nextKeyValue(); + if (!hasNext) + return hasNext; + PhoenixRecordWritable phoenixRecordWritable = (PhoenixRecordWritable) this.phoenixRecordReader.getCurrentValue(); + Map phoenixRecord = phoenixRecordWritable.getResultMap(); + this.constructRecordFromPhoenix(record, phoenixRecord); + return hasNext; + } + + public void destroy() { + if (this.phoenixRecordReader != null) { + try { + this.phoenixRecordReader.close(); + } catch (IOException e) { + throw DataXException.asDataXException( + HbaseSQLReaderErrorCode.PHOENIX_READER_CLOSE_ERROR, "phoenix的reader close失败,请重试,若还有问题请检查hbase集群状态" + e.getMessage()); + } + } + } +} + + diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java new file mode 100644 index 00000000..7987d6c8 --- /dev/null +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java @@ -0,0 +1,28 @@ +package com.alibaba.datax.plugin.reader.hbase11xsqlreader; + +import org.apache.hadoop.hbase.HConstants; + +public final class Key { + + public final static String MOCK_JOBID_IDENTIFIER = "phoenixreader"; + public final static int MOCK_JOBID = 1; + public final static String SPLIT_KEY = "phoenixsplit"; + + /** + * 【必选】hbase集群配置,连接一个hbase集群需要的最小配置只有两个:zk和znode + */ + public final static String HBASE_CONFIG = "hbaseConfig"; + public final static String HBASE_ZK_QUORUM = HConstants.ZOOKEEPER_QUORUM; + public final static String HBASE_ZNODE_PARENT = HConstants.ZOOKEEPER_ZNODE_PARENT; + + /** + * 【必选】writer要写入的表的表名 + */ + public final static String TABLE = "table"; + + /** + * 【必选】列配置 + */ + public final static String COLUMN = "column"; + +} diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings.properties b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings.properties new file mode 100644 index 00000000..8fde2c7f --- /dev/null +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings.properties @@ -0,0 +1,32 @@ +errorcode.required_value=\u60A8\u7F3A\u5931\u4E86\u5FC5\u987B\u586B\u5199\u7684\u53C2\u6570\u503C. +errorcode.illegal_value=\u60A8\u586B\u5199\u7684\u53C2\u6570\u503C\u4E0D\u5408\u6CD5. +errorcode.get_phoenix_table_columns_error=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u9519. +errorcode.get_phoenix_connectioninfo_error=\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519. +errorcode.get_phoenix_splits_error=\u83B7\u53D6phoenix\u7684split\u4FE1\u606F\u65F6\u51FA\u9519. +errorcode.get_phoenix_createreader_error=\u521B\u5EFAphoenix\u7684split\u7684reader\u65F6\u51FA\u9519. +errorcode.get_phoenix_readerinit_error=phoenix\u7684split\u7684reader\u521D\u59CB\u5316\u65F6\u51FA\u9519. +errorcode.get_phoenix_column_typeconvert_error=\u5C06phoenix\u5217\u7684\u7C7B\u578B\u8F6C\u6362\u4E3Adatax\u7684\u7C7B\u578B\u65F6\u51FA\u9519. +errorcode.get_phoenix_record_read_error=\u8BFB\u53D6phoenix\u5177\u4F53\u7684\u4E00\u884C\u65F6\u51FA\u9519. +errorcode.get_phoenix_reader_close_error=\u5173\u95EDphoenix\u3000reader\u65F6\u51FA\u9519. + + +sqlhelper.1=\u901A\u8FC7zkURL\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38 +sqlhelper.2=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u51FA\u73B0\u4E86\u5F02\u5E38\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38 +sqlhelper.3=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u88AB\u4E2D\u65AD\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u8054\u7CFBdatax\u7BA1\u7406\u5458 + + +sqlreadertask.1=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u95EE\u9898\uFF0C\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.2=\u521B\u5EFAphoenix\u7684reader\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.3=phoenix\u7684reader\u521D\u59CB\u5316\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.4=phoenix\u7684reader\u521D\u59CB\u5316\u88AB\u4E2D\u65AD,\u8BF7\u91CD\u8BD5 +sqlreadertask.5=\u9047\u5230\u4E0D\u53EF\u8BC6\u522B\u7684phoenix\u7C7B\u578B\uFF0C\u8BF7\u8054\u7CFBhbase\u7BA1\u7406\u5458 +sqlreadertask.6=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.7=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.8=phoenix\u7684reader close\u5931\u8D25,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 + +hbaseconfig.1=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u4E0D\u80FD\u4E3A\u7A7A +hbaseconfig.2=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u6709\u95EE\u9898\uFF0C\u8BF7\u53C2\u8003\u6587\u6863\u68C0\u67E5\u4E0B +hbaseconfig.3=zkquorum\u4E0D\u80FD\u4E3A\u7A7A +hbaseconfig.5=table\u7684\u540D\u5B57\u4E0D\u80FD\u4E3A\u7A7A +hbaseconfig.6=column\u53C2\u6570\u6CA1\u6709\u914D\u7F6E +hbaseconfig.7=\u4ECEphoenix\u83B7\u53D6column\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_en_US.properties b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_en_US.properties new file mode 100644 index 00000000..8fde2c7f --- /dev/null +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_en_US.properties @@ -0,0 +1,32 @@ +errorcode.required_value=\u60A8\u7F3A\u5931\u4E86\u5FC5\u987B\u586B\u5199\u7684\u53C2\u6570\u503C. +errorcode.illegal_value=\u60A8\u586B\u5199\u7684\u53C2\u6570\u503C\u4E0D\u5408\u6CD5. +errorcode.get_phoenix_table_columns_error=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u9519. +errorcode.get_phoenix_connectioninfo_error=\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519. +errorcode.get_phoenix_splits_error=\u83B7\u53D6phoenix\u7684split\u4FE1\u606F\u65F6\u51FA\u9519. +errorcode.get_phoenix_createreader_error=\u521B\u5EFAphoenix\u7684split\u7684reader\u65F6\u51FA\u9519. +errorcode.get_phoenix_readerinit_error=phoenix\u7684split\u7684reader\u521D\u59CB\u5316\u65F6\u51FA\u9519. +errorcode.get_phoenix_column_typeconvert_error=\u5C06phoenix\u5217\u7684\u7C7B\u578B\u8F6C\u6362\u4E3Adatax\u7684\u7C7B\u578B\u65F6\u51FA\u9519. +errorcode.get_phoenix_record_read_error=\u8BFB\u53D6phoenix\u5177\u4F53\u7684\u4E00\u884C\u65F6\u51FA\u9519. +errorcode.get_phoenix_reader_close_error=\u5173\u95EDphoenix\u3000reader\u65F6\u51FA\u9519. + + +sqlhelper.1=\u901A\u8FC7zkURL\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38 +sqlhelper.2=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u51FA\u73B0\u4E86\u5F02\u5E38\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38 +sqlhelper.3=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u88AB\u4E2D\u65AD\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u8054\u7CFBdatax\u7BA1\u7406\u5458 + + +sqlreadertask.1=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u95EE\u9898\uFF0C\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.2=\u521B\u5EFAphoenix\u7684reader\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.3=phoenix\u7684reader\u521D\u59CB\u5316\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.4=phoenix\u7684reader\u521D\u59CB\u5316\u88AB\u4E2D\u65AD,\u8BF7\u91CD\u8BD5 +sqlreadertask.5=\u9047\u5230\u4E0D\u53EF\u8BC6\u522B\u7684phoenix\u7C7B\u578B\uFF0C\u8BF7\u8054\u7CFBhbase\u7BA1\u7406\u5458 +sqlreadertask.6=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.7=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.8=phoenix\u7684reader close\u5931\u8D25,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 + +hbaseconfig.1=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u4E0D\u80FD\u4E3A\u7A7A +hbaseconfig.2=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u6709\u95EE\u9898\uFF0C\u8BF7\u53C2\u8003\u6587\u6863\u68C0\u67E5\u4E0B +hbaseconfig.3=zkquorum\u4E0D\u80FD\u4E3A\u7A7A +hbaseconfig.5=table\u7684\u540D\u5B57\u4E0D\u80FD\u4E3A\u7A7A +hbaseconfig.6=column\u53C2\u6570\u6CA1\u6709\u914D\u7F6E +hbaseconfig.7=\u4ECEphoenix\u83B7\u53D6column\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_ja_JP.properties b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_ja_JP.properties new file mode 100644 index 00000000..8fde2c7f --- /dev/null +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_ja_JP.properties @@ -0,0 +1,32 @@ +errorcode.required_value=\u60A8\u7F3A\u5931\u4E86\u5FC5\u987B\u586B\u5199\u7684\u53C2\u6570\u503C. +errorcode.illegal_value=\u60A8\u586B\u5199\u7684\u53C2\u6570\u503C\u4E0D\u5408\u6CD5. +errorcode.get_phoenix_table_columns_error=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u9519. +errorcode.get_phoenix_connectioninfo_error=\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519. +errorcode.get_phoenix_splits_error=\u83B7\u53D6phoenix\u7684split\u4FE1\u606F\u65F6\u51FA\u9519. +errorcode.get_phoenix_createreader_error=\u521B\u5EFAphoenix\u7684split\u7684reader\u65F6\u51FA\u9519. +errorcode.get_phoenix_readerinit_error=phoenix\u7684split\u7684reader\u521D\u59CB\u5316\u65F6\u51FA\u9519. +errorcode.get_phoenix_column_typeconvert_error=\u5C06phoenix\u5217\u7684\u7C7B\u578B\u8F6C\u6362\u4E3Adatax\u7684\u7C7B\u578B\u65F6\u51FA\u9519. +errorcode.get_phoenix_record_read_error=\u8BFB\u53D6phoenix\u5177\u4F53\u7684\u4E00\u884C\u65F6\u51FA\u9519. +errorcode.get_phoenix_reader_close_error=\u5173\u95EDphoenix\u3000reader\u65F6\u51FA\u9519. + + +sqlhelper.1=\u901A\u8FC7zkURL\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38 +sqlhelper.2=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u51FA\u73B0\u4E86\u5F02\u5E38\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38 +sqlhelper.3=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u88AB\u4E2D\u65AD\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u8054\u7CFBdatax\u7BA1\u7406\u5458 + + +sqlreadertask.1=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u95EE\u9898\uFF0C\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.2=\u521B\u5EFAphoenix\u7684reader\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.3=phoenix\u7684reader\u521D\u59CB\u5316\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.4=phoenix\u7684reader\u521D\u59CB\u5316\u88AB\u4E2D\u65AD,\u8BF7\u91CD\u8BD5 +sqlreadertask.5=\u9047\u5230\u4E0D\u53EF\u8BC6\u522B\u7684phoenix\u7C7B\u578B\uFF0C\u8BF7\u8054\u7CFBhbase\u7BA1\u7406\u5458 +sqlreadertask.6=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.7=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.8=phoenix\u7684reader close\u5931\u8D25,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 + +hbaseconfig.1=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u4E0D\u80FD\u4E3A\u7A7A +hbaseconfig.2=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u6709\u95EE\u9898\uFF0C\u8BF7\u53C2\u8003\u6587\u6863\u68C0\u67E5\u4E0B +hbaseconfig.3=zkquorum\u4E0D\u80FD\u4E3A\u7A7A +hbaseconfig.5=table\u7684\u540D\u5B57\u4E0D\u80FD\u4E3A\u7A7A +hbaseconfig.6=column\u53C2\u6570\u6CA1\u6709\u914D\u7F6E +hbaseconfig.7=\u4ECEphoenix\u83B7\u53D6column\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_zh_CN.properties b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_zh_CN.properties new file mode 100644 index 00000000..8fde2c7f --- /dev/null +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/LocalStrings_zh_CN.properties @@ -0,0 +1,32 @@ +errorcode.required_value=\u60A8\u7F3A\u5931\u4E86\u5FC5\u987B\u586B\u5199\u7684\u53C2\u6570\u503C. +errorcode.illegal_value=\u60A8\u586B\u5199\u7684\u53C2\u6570\u503C\u4E0D\u5408\u6CD5. +errorcode.get_phoenix_table_columns_error=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u9519. +errorcode.get_phoenix_connectioninfo_error=\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519. +errorcode.get_phoenix_splits_error=\u83B7\u53D6phoenix\u7684split\u4FE1\u606F\u65F6\u51FA\u9519. +errorcode.get_phoenix_createreader_error=\u521B\u5EFAphoenix\u7684split\u7684reader\u65F6\u51FA\u9519. +errorcode.get_phoenix_readerinit_error=phoenix\u7684split\u7684reader\u521D\u59CB\u5316\u65F6\u51FA\u9519. +errorcode.get_phoenix_column_typeconvert_error=\u5C06phoenix\u5217\u7684\u7C7B\u578B\u8F6C\u6362\u4E3Adatax\u7684\u7C7B\u578B\u65F6\u51FA\u9519. +errorcode.get_phoenix_record_read_error=\u8BFB\u53D6phoenix\u5177\u4F53\u7684\u4E00\u884C\u65F6\u51FA\u9519. +errorcode.get_phoenix_reader_close_error=\u5173\u95EDphoenix\u3000reader\u65F6\u51FA\u9519. + + +sqlhelper.1=\u901A\u8FC7zkURL\u83B7\u53D6phoenix\u7684connectioninfo\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38 +sqlhelper.2=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u51FA\u73B0\u4E86\u5F02\u5E38\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u670D\u52A1\u662F\u5426\u6B63\u5E38 +sqlhelper.3=\u83B7\u53D6\u8868\u7684split\u4FE1\u606F\u65F6\u88AB\u4E2D\u65AD\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u8054\u7CFBdatax\u7BA1\u7406\u5458 + + +sqlreadertask.1=\u83B7\u53D6\u8868\u7684\u5217\u51FA\u95EE\u9898\uFF0C\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.2=\u521B\u5EFAphoenix\u7684reader\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.3=phoenix\u7684reader\u521D\u59CB\u5316\u51FA\u73B0\u95EE\u9898,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.4=phoenix\u7684reader\u521D\u59CB\u5316\u88AB\u4E2D\u65AD,\u8BF7\u91CD\u8BD5 +sqlreadertask.5=\u9047\u5230\u4E0D\u53EF\u8BC6\u522B\u7684phoenix\u7C7B\u578B\uFF0C\u8BF7\u8054\u7CFBhbase\u7BA1\u7406\u5458 +sqlreadertask.6=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.7=\u8BFB\u53D6phoenix\u7684record\u65F6\u51FA\u73B0\u95EE\u9898\uFF0C\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 +sqlreadertask.8=phoenix\u7684reader close\u5931\u8D25,\u8BF7\u91CD\u8BD5\uFF0C\u82E5\u8FD8\u6709\u95EE\u9898\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 + +hbaseconfig.1=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u4E0D\u80FD\u4E3A\u7A7A +hbaseconfig.2=hbase\u7684\u914D\u7F6E\u4FE1\u606F\u6709\u95EE\u9898\uFF0C\u8BF7\u53C2\u8003\u6587\u6863\u68C0\u67E5\u4E0B +hbaseconfig.3=zkquorum\u4E0D\u80FD\u4E3A\u7A7A +hbaseconfig.5=table\u7684\u540D\u5B57\u4E0D\u80FD\u4E3A\u7A7A +hbaseconfig.6=column\u53C2\u6570\u6CA1\u6709\u914D\u7F6E +hbaseconfig.7=\u4ECEphoenix\u83B7\u53D6column\u51FA\u9519\uFF0C\u8BF7\u68C0\u67E5hbase\u96C6\u7FA4\u72B6\u6001 diff --git a/hbase11xsqlreader/src/main/resources/plugin.json b/hbase11xsqlreader/src/main/resources/plugin.json new file mode 100644 index 00000000..e245ca27 --- /dev/null +++ b/hbase11xsqlreader/src/main/resources/plugin.json @@ -0,0 +1,7 @@ +{ + "name": "hbase11xsqlreader", + "class": "com.alibaba.datax.plugin.reader.hbase11xsqlreader.HbaseSQLReader", + "description": "useScene: prod. mechanism: Scan to read data.", + "developer": "liwei.li, bug reported to : liwei.li@alibaba-inc.com" +} + diff --git a/hbase11xsqlreader/src/main/resources/plugin_job_template.json b/hbase11xsqlreader/src/main/resources/plugin_job_template.json new file mode 100644 index 00000000..7f864d76 --- /dev/null +++ b/hbase11xsqlreader/src/main/resources/plugin_job_template.json @@ -0,0 +1,13 @@ +{ + "name": "hbase11sqlreader", + "parameter": { + "hbaseConfig": { + "hbase.zookeeper.quorum": "hb-proxy-pub-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-003.hbase.rds.aliyuncs.com" + }, + "table": "TABLE1", + "column": [ + "ID", + "COL1" + ] + } +} diff --git a/hbase11xsqlreader/src/test/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelperTest.java b/hbase11xsqlreader/src/test/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelperTest.java new file mode 100644 index 00000000..be0bc6f2 --- /dev/null +++ b/hbase11xsqlreader/src/test/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelperTest.java @@ -0,0 +1,40 @@ +package com.alibaba.datax.plugin.reader.hbase11xsqlreader; + +import com.alibaba.datax.common.util.Configuration; +import org.junit.Test; + +import java.util.List; + +import static junit.framework.Assert.assertEquals; + +/** + * Created by shf on 16/7/20. + */ +public class HbaseSQLHelperTest { + + private String jsonStr = "{\n" + + " \"hbaseConfig\": {\n" + + " \"hbase.zookeeper.quorum\": \"hb-proxy-pub-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-003.hbase.rds.aliyuncs.com\"\n" + + " },\n" + + " \"table\": \"TABLE1\",\n" + + " \"column\": []\n" + + " }"; + + + @Test + public void testParseConfig() { + Configuration config = Configuration.from(jsonStr); + HbaseSQLReaderConfig readerConfig = HbaseSQLHelper.parseConfig(config); + System.out.println("tablenae = " +readerConfig.getTableName() +",zk = " +readerConfig.getZkUrl()); + assertEquals("TABLE1", readerConfig.getTableName()); + assertEquals("hb-proxy-pub-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-003.hbase.rds.aliyuncs.com:2181", readerConfig.getZkUrl()); + } + + @Test + public void testSplit() { + Configuration config = Configuration.from(jsonStr); + HbaseSQLReaderConfig readerConfig = HbaseSQLHelper.parseConfig(config); + List splits = HbaseSQLHelper.split(readerConfig); + System.out.println("split size = " + splits.size()); + } +} diff --git a/hbase11xsqlreader/src/test/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTaskTest.java b/hbase11xsqlreader/src/test/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTaskTest.java new file mode 100644 index 00000000..a7e3c31b --- /dev/null +++ b/hbase11xsqlreader/src/test/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTaskTest.java @@ -0,0 +1,78 @@ +package com.alibaba.datax.plugin.reader.hbase11xsqlreader; + +import com.alibaba.datax.common.element.*; +import com.alibaba.datax.common.plugin.RecordSender; +import com.alibaba.datax.common.util.Configuration; +import com.alibaba.datax.core.transport.record.DefaultRecord; +import org.junit.Test; + +import java.io.IOException; +import java.util.List; + +import static junit.framework.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class HbaseSQLReaderTaskTest { + + private String jsonStr = "{\n" + + " \"hbaseConfig\": {\n" + + " \"hbase.zookeeper.quorum\": \"hb-proxy-pub-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-pub-xxx-003.hbase.rds.aliyuncs.com\"\n" + + " },\n" + + " \"table\": \"TABLE1\",\n" + + " \"column\": []\n" + + " }"; + + private List generateSplitConfig() throws IOException, InterruptedException { + Configuration config = Configuration.from(jsonStr); + HbaseSQLReaderConfig readerConfig = HbaseSQLHelper.parseConfig(config); + List splits = HbaseSQLHelper.split(readerConfig); + System.out.println("split size = " + splits.size()); + return splits; + } + + @Test + public void testReadRecord() throws Exception { + List splits = this.generateSplitConfig(); + + int allRecordNum = 0; + for (int i = 0; i < splits.size(); i++) { + RecordSender recordSender = mock(RecordSender.class); + when(recordSender.createRecord()).thenReturn(new DefaultRecord()); + Record record = recordSender.createRecord(); + + HbaseSQLReaderTask hbase11SQLReaderTask = new HbaseSQLReaderTask(splits.get(i)); + hbase11SQLReaderTask.init(); + hbase11SQLReaderTask.prepare(); + + int num = 0; + while (true) { + boolean hasLine = false; + try { + hasLine = hbase11SQLReaderTask.readRecord(record); + } catch (Exception e) { + e.printStackTrace(); + throw e; + } + if (!hasLine) + break; + num++; + if (num % 100 == 0) + System.out.println("record num is :" + num + ",record is " + record.toString()); + when(recordSender.createRecord()).thenReturn(new DefaultRecord()); + String recordStr = ""; + for (int j = 0; j < record.getColumnNumber(); j++) { + recordStr += record.getColumn(j).asString() + ","; + } + recordSender.sendToWriter(record); + record = recordSender.createRecord(); + } + System.out.println("split id is " + i + ",record num = " + num); + allRecordNum += num; + recordSender.flush(); + hbase11SQLReaderTask.destroy(); + } + System.out.println("all record num = " + allRecordNum); + assertEquals(10000, allRecordNum); + } +} diff --git a/package.xml b/package.xml index 7ec2389d..ae07dd08 100755 --- a/package.xml +++ b/package.xml @@ -294,6 +294,13 @@ datax + + hbase11xsqlreader/target/datax/ + + **/*.* + + datax + elasticsearchwriter/target/datax/ diff --git a/pom.xml b/pom.xml index 9ad28b9b..f8bc7434 100755 --- a/pom.xml +++ b/pom.xml @@ -83,6 +83,7 @@ hbase11xwriter hbase094xwriter hbase11xsqlwriter + hbase11xsqlreader elasticsearchwriter