diff --git a/README.md b/README.md
index 60370693..f12bb021 100644
--- a/README.md
+++ b/README.md
@@ -47,6 +47,7 @@ DataX目前已经有了比较全面的插件体系,主流的RDBMS数据库、N
| NoSQL数据存储 | OTS | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/otsreader/doc/otsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/otswriter/doc/otswriter.md)|
| | Hbase0.94 | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/hbase094xreader/doc/hbase094xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase094xwriter/doc/hbase094xwriter.md)|
| | Hbase1.1 | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/hbase11xreader/doc/hbase11xreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xwriter/doc/hbase11xwriter.md)|
+| | Phoenix4.x | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/hbase11xsqlreader/doc/hbase11xsqlreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hbase11xsqlwriter/doc/hbase11xsqlwriter.md)|
| | MongoDB | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/mongoreader/doc/mongoreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/mongowriter/doc/mongowriter.md)|
| | Hive | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/hdfsreader/doc/hdfsreader.md) 、[写](https://github.com/alibaba/DataX/blob/master/hdfswriter/doc/hdfswriter.md)|
| 无结构化数据存储 | TxtFile | √ | √ |[读](https://github.com/alibaba/DataX/blob/master/txtfilereader/doc/txtfilereader.md) 、[写](https://github.com/alibaba/DataX/blob/master/txtfilewriter/doc/txtfilewriter.md)|
diff --git a/hbase11xsqlreader/doc/hbase11xsqlreader.md b/hbase11xsqlreader/doc/hbase11xsqlreader.md
new file mode 100644
index 00000000..03261a1f
--- /dev/null
+++ b/hbase11xsqlreader/doc/hbase11xsqlreader.md
@@ -0,0 +1,252 @@
+# hbase11xsqlreader 插件文档
+
+
+___
+
+
+
+## 1 快速介绍
+
+hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实现上,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并执行相应的sql语句将数据从Phoenix库中SELECT出来。
+
+
+## 2 实现原理
+
+简而言之,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并根据用户配置的信息生成查询SELECT 语句,然后发送到HBase集群,并将返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。
+# hbase11xsqlreader 插件文档
+
+
+___
+
+
+
+## 1 快速介绍
+
+hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实现上,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并执行相应的sql语句将数据从Phoenix库中SELECT出来。
+
+
+## 2 实现原理
+
+简而言之,hbase11xsqlreader通过Phoenix客户端去连接远程的HBase集群,并根据用户配置的信息生成查询SELECT 语句,然后发送到HBase集群,并将返回结果使用DataX自定义的数据类型拼装为抽象的数据集,并传递给下游Writer处理。
+
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+* 配置一个从Phoenix同步抽取数据到本地的作业:
+
+```
+{
+ "job": {
+ "setting": {
+ "speed": {
+ //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它.
+ "byte":10485760
+ },
+ //出错限制
+ "errorLimit": {
+ //出错的record条数上限,当大于该值即报错。
+ "record": 0,
+ //出错的record百分比上限 1.0表示100%,0.02表示2%
+ "percentage": 0.02
+ }
+ },
+ "content": [ {
+ "reader": {
+ //指定插件为hbase11xsqlreader
+ "name": "hbase11xsqlreader",
+ "parameter": {
+ //填写连接Phoenix的hbase集群zk地址
+ "hbaseConfig": {
+ "hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com"
+ },
+ //填写要读取的phoenix的表名
+ "table": "US_POPULATION",
+ //填写要读取的列名,不填读取所有列
+ "column": [
+ ]
+ }
+ },
+ "writer": {
+ //writer类型
+ "name": "streamwriter",
+ //是否打印内容
+ "parameter": {
+ "print":true,
+ "encoding": "UTF-8"
+ }
+ }
+ }
+ ]
+ }
+}
+```
+
+
+### 3.2 参数说明
+
+* **hbaseConfig**
+
+ * 描述:hbase11xsqlreader需要通过Phoenix客户端去连接hbase集群,因此这里需要填写对应hbase集群的zkurl地址,注意不要添加2181。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **table**
+
+ * 描述:编写Phoenix中的表名,如果有namespace,该值设置为'namespace.tablename'
+
+ * 必选:是
+
+ * 默认值:无
+
+* **column**
+
+ * 描述:填写需要从phoenix表中读取的列名集合,使用JSON的数组描述字段信息,空值表示读取所有列。
+
+ * 必选:是
+
+ * 默认值:无
+
+
+### 3.3 类型转换
+
+目前hbase11xsqlreader支持大部分Phoenix类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
+
+下面列出MysqlReader针对Mysql类型转换列表:
+
+
+| DataX 内部类型| Phoenix 数据类型 |
+| -------- | ----- |
+| String |CHAR, VARCHAR|
+| Bytes |BINARY, VARBINARY|
+| Bool |BOOLEAN |
+| Long |INTEGER, TINYINT, SMALLINT, BIGINT |
+| Double |FLOAT, DECIMAL, DOUBLE, |
+| Date |DATE, TIME, TIMESTAMP |
+
+
+
+## 4 性能报告
+
+略
+
+## 5 约束限制
+略
+## 6 FAQ
+
+***
+
+
+
+## 3 功能说明
+
+### 3.1 配置样例
+
+* 配置一个从Phoenix同步抽取数据到本地的作业:
+
+```
+{
+ "job": {
+ "setting": {
+ "speed": {
+ //设置传输速度,单位为byte/s,DataX运行会尽可能达到该速度但是不超过它.
+ "byte":10485760
+ },
+ //出错限制
+ "errorLimit": {
+ //出错的record条数上限,当大于该值即报错。
+ "record": 0,
+ //出错的record百分比上限 1.0表示100%,0.02表示2%
+ "percentage": 0.02
+ }
+ },
+ "content": [ {
+ "reader": {
+ //指定插件为hbase11xsqlreader
+ "name": "hbase11xsqlreader",
+ "parameter": {
+ //填写连接Phoenix的hbase集群zk地址
+ "hbaseConfig": {
+ "hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com"
+ },
+ //填写要读取的phoenix的表名
+ "table": "US_POPULATION",
+ //填写要读取的列名,不填读取所有列
+ "column": [
+ ]
+ }
+ },
+ "writer": {
+ //writer类型
+ "name": "streamwriter",
+ //是否打印内容
+ "parameter": {
+ "print":true,
+ "encoding": "UTF-8"
+ }
+ }
+ }
+ ]
+ }
+}
+```
+
+
+### 3.2 参数说明
+
+* **hbaseConfig**
+
+ * 描述:hbase11xsqlreader需要通过Phoenix客户端去连接hbase集群,因此这里需要填写对应hbase集群的zkurl地址,注意不要添加2181。
+
+ * 必选:是
+
+ * 默认值:无
+
+* **table**
+
+ * 描述:编写Phoenix中的表名,如果有namespace,该值设置为'namespace.tablename'
+
+ * 必选:是
+
+ * 默认值:无
+
+* **column**
+
+ * 描述:填写需要从phoenix表中读取的列名集合,使用JSON的数组描述字段信息,空值表示读取所有列。
+
+ * 必选:是
+
+ * 默认值:无
+
+
+### 3.3 类型转换
+
+目前hbase11xsqlreader支持大部分Phoenix类型,但也存在部分个别类型没有支持的情况,请注意检查你的类型。
+
+下面列出MysqlReader针对Mysql类型转换列表:
+
+
+| DataX 内部类型| Phoenix 数据类型 |
+| -------- | ----- |
+| String |CHAR, VARCHAR|
+| Bytes |BINARY, VARBINARY|
+| Bool |BOOLEAN |
+| Long |INTEGER, TINYINT, SMALLINT, BIGINT |
+| Double |FLOAT, DECIMAL, DOUBLE, |
+| Date |DATE, TIME, TIMESTAMP |
+
+
+
+## 4 性能报告
+
+略
+
+## 5 约束限制
+略
+## 6 FAQ
+
+***
+
diff --git a/hbase11xsqlreader/pom.xml b/hbase11xsqlreader/pom.xml
new file mode 100644
index 00000000..6ea727c0
--- /dev/null
+++ b/hbase11xsqlreader/pom.xml
@@ -0,0 +1,109 @@
+
+ 4.0.0
+
+ com.alibaba.datax
+ datax-all
+ 0.0.1-SNAPSHOT
+
+ hbase11xsqlreader
+ hbase11xsqlreader
+ 0.0.1-SNAPSHOT
+ jar
+
+
+ 4.12.0-AliHBase-1.1-0.5
+
+
+
+
+ com.alibaba.datax
+ datax-common
+ ${datax-project-version}
+
+
+ slf4j-log4j12
+ org.slf4j
+
+
+
+
+
+ com.aliyun.phoenix
+ ali-phoenix-core
+ ${phoenix.version}
+
+
+ servlet-api
+ javax.servlet
+
+
+
+
+
+
+ junit
+ junit
+ test
+
+
+ org.mockito
+ mockito-core
+ 2.0.44-beta
+ test
+
+
+ com.alibaba.datax
+ datax-core
+ ${datax-project-version}
+
+
+ com.alibaba.datax
+ datax-service-face
+
+
+ test
+
+
+
+
+
+
+ src/main/java
+
+ **/*.properties
+
+
+
+
+
+
+ maven-compiler-plugin
+
+ 1.6
+ 1.6
+ ${project-sourceEncoding}
+
+
+
+ maven-assembly-plugin
+
+
+ src/main/assembly/package.xml
+
+ datax
+
+
+
+ dwzip
+ package
+
+ single
+
+
+
+
+
+
+
+
diff --git a/hbase11xsqlreader/src/main/assembly/package.xml b/hbase11xsqlreader/src/main/assembly/package.xml
new file mode 100644
index 00000000..b5c1e19b
--- /dev/null
+++ b/hbase11xsqlreader/src/main/assembly/package.xml
@@ -0,0 +1,35 @@
+
+
+
+ dir
+
+ false
+
+
+ src/main/resources
+
+ plugin.json
+ plugin_job_template.json
+
+ plugin/reader/hbase11xsqlreader
+
+
+ target/
+
+ hbase11xsqlreader-0.0.1-SNAPSHOT.jar
+
+ plugin/reader/hbase11xsqlreader
+
+
+
+
+
+ false
+ plugin/reader/hbase11xsqlreader/libs
+ runtime
+
+
+
diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HadoopSerializationUtil.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HadoopSerializationUtil.java
new file mode 100644
index 00000000..45907123
--- /dev/null
+++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HadoopSerializationUtil.java
@@ -0,0 +1,31 @@
+package com.alibaba.datax.plugin.reader.hbase11xsqlreader;
+
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
+import org.apache.hadoop.io.Writable;
+
+public class HadoopSerializationUtil {
+
+ public static byte[] serialize(Writable writable) throws IOException {
+ ByteArrayOutputStream out = new ByteArrayOutputStream();
+ DataOutputStream dataout = new DataOutputStream(out);
+ writable.write(dataout);
+ dataout.close();
+ return out.toByteArray();
+ }
+
+ public static void deserialize(Writable writable, byte[] bytes) throws Exception {
+
+ ByteArrayInputStream in = new ByteArrayInputStream(bytes);
+ DataInputStream datain = new DataInputStream(in);
+ writable.readFields(datain);
+ datain.close();
+ }
+
+
+}
\ No newline at end of file
diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java
new file mode 100644
index 00000000..2aacdddf
--- /dev/null
+++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java
@@ -0,0 +1,125 @@
+package com.alibaba.datax.plugin.reader.hbase11xsqlreader;
+
+import com.alibaba.datax.common.exception.DataXException;
+import com.alibaba.datax.common.util.Configuration;
+import com.alibaba.fastjson.JSON;
+import com.alibaba.fastjson.TypeReference;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.JobID;
+import org.apache.hadoop.mapreduce.task.JobContextImpl;
+import org.apache.phoenix.jdbc.PhoenixConnection;
+import org.apache.phoenix.jdbc.PhoenixEmbeddedDriver;
+import org.apache.phoenix.mapreduce.PhoenixInputFormat;
+import org.apache.phoenix.mapreduce.PhoenixInputSplit;
+import org.apache.phoenix.mapreduce.PhoenixRecordWritable;
+import org.apache.phoenix.mapreduce.util.PhoenixConfigurationUtil;
+import org.apache.phoenix.schema.MetaDataClient;
+import org.apache.phoenix.schema.PColumn;
+import org.apache.phoenix.schema.PTable;
+import org.apache.phoenix.schema.SaltingUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+
+public class HbaseSQLHelper {
+ private static final Logger LOG = LoggerFactory.getLogger(HbaseSQLHelper.class);
+
+ public static org.apache.hadoop.conf.Configuration generatePhoenixConf(HbaseSQLReaderConfig readerConfig) {
+ org.apache.hadoop.conf.Configuration conf = new org.apache.hadoop.conf.Configuration();
+
+ String table = readerConfig.getTableName();
+ List columns = readerConfig.getColumns();
+ String zkUrl = readerConfig.getZkUrl();
+
+ PhoenixConfigurationUtil.setInputClass(conf, PhoenixRecordWritable.class);
+ PhoenixConfigurationUtil.setInputTableName(conf, table);
+
+ if (!columns.isEmpty()) {
+ PhoenixConfigurationUtil.setSelectColumnNames(conf, columns.toArray(new String[columns.size()]));
+ }
+ PhoenixEmbeddedDriver.ConnectionInfo info = null;
+ try {
+ info = PhoenixEmbeddedDriver.ConnectionInfo.create(zkUrl);
+ } catch (SQLException e) {
+ throw DataXException.asDataXException(
+ HbaseSQLReaderErrorCode.GET_PHOENIX_CONNECTIONINFO_ERROR, "通过zkURL获取phoenix的connectioninfo出错,请检查hbase集群服务是否正常", e);
+ }
+ conf.set(HConstants.ZOOKEEPER_QUORUM, info.getZookeeperQuorum());
+ if (info.getPort() != null)
+ conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, info.getPort());
+ if (info.getRootNode() != null)
+ conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, info.getRootNode());
+ return conf;
+ }
+
+ public static List getPColumnNames(String connectionString, String tableName) throws SQLException {
+ Connection con =
+ DriverManager.getConnection(connectionString);
+ PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class);
+ MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection);
+ PTable table = metaDataClient.updateCache("", tableName).getTable();
+ List columnNames = new ArrayList();
+ for (PColumn pColumn : table.getColumns()) {
+ if (!pColumn.getName().getString().equals(SaltingUtil.SALTING_COLUMN_NAME))
+ columnNames.add(pColumn.getName().getString());
+ else
+ LOG.info(tableName + " is salt table");
+ }
+ return columnNames;
+ }
+
+
+ public static List split(HbaseSQLReaderConfig readerConfig) {
+ PhoenixInputFormat inputFormat = new PhoenixInputFormat();
+ org.apache.hadoop.conf.Configuration conf = generatePhoenixConf(readerConfig);
+ JobID jobId = new JobID(Key.MOCK_JOBID_IDENTIFIER, Key.MOCK_JOBID);
+ JobContextImpl jobContext = new JobContextImpl(conf, jobId);
+ List resultConfigurations = new ArrayList();
+ List rawSplits = null;
+ try {
+ rawSplits = inputFormat.getSplits(jobContext);
+ LOG.info("split size is " + rawSplits.size());
+ for (InputSplit split : rawSplits) {
+ Configuration cfg = readerConfig.getOriginalConfig().clone();
+
+ byte[] splitSer = HadoopSerializationUtil.serialize((PhoenixInputSplit) split);
+ String splitBase64Str = org.apache.commons.codec.binary.Base64.encodeBase64String(splitSer);
+ cfg.set(Key.SPLIT_KEY, splitBase64Str);
+ resultConfigurations.add(cfg);
+ }
+ } catch (IOException e) {
+ throw DataXException.asDataXException(
+ HbaseSQLReaderErrorCode.GET_PHOENIX_SPLITS_ERROR, "获取表的split信息时出现了异常,请检查hbase集群服务是否正常," + e.getMessage(), e);
+ } catch (InterruptedException e) {
+ throw DataXException.asDataXException(
+ HbaseSQLReaderErrorCode.GET_PHOENIX_SPLITS_ERROR, "获取表的split信息时被中断,请重试,若还有问题请联系datax管理员," + e.getMessage(), e);
+ }
+
+ return resultConfigurations;
+ }
+
+ public static HbaseSQLReaderConfig parseConfig(Configuration cfg) {
+ return HbaseSQLReaderConfig.parse(cfg);
+ }
+
+ public static Pair getHbaseConfig(String hbaseCfgString) {
+ assert hbaseCfgString != null;
+ Map hbaseConfigMap = JSON.parseObject(hbaseCfgString, new TypeReference