Merge pull request #1606 from hadoop835/master

修复 Phoenix4版本同步数据添加where条件
This commit is contained in:
jtchen-study 2023-09-07 10:59:44 +08:00 committed by GitHub
commit 8db76ddd5d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
5 changed files with 104 additions and 24 deletions

View File

@ -61,11 +61,15 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实
"hbaseConfig": { "hbaseConfig": {
"hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com" "hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com"
}, },
//填写要读取的phoenix的命名空间
"schema": "TAG",
//填写要读取的phoenix的表名 //填写要读取的phoenix的表名
"table": "US_POPULATION", "table": "US_POPULATION",
//填写要读取的列名,不填读取所有列 //填写要读取的列名,不填读取所有列
"column": [ "column": [
] ],
//查询条件
"where": "id="
} }
}, },
"writer": { "writer": {
@ -92,11 +96,18 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实
* 必选:是 <br /> * 必选:是 <br />
* 默认值:无 <br />
* **schema**
* 描述编写Phoenix中的namespace该值设置为''
* 必选:是 <br />
* 默认值:无 <br /> * 默认值:无 <br />
* **table** * **table**
* 描述编写Phoenix中的表名,如果有namespace该值设置为'namespace.tablename' * 描述编写Phoenix中的表名该值设置为'tablename'
* 必选:是 <br /> * 必选:是 <br />
@ -109,7 +120,13 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实
* 必选:是 <br /> * 必选:是 <br />
* 默认值:无 <br /> * 默认值:无 <br />
* **where**
* 描述填写需要从phoenix表中读取条件判断。
* 可选:是 <br />
* 默认值:无 <br />
### 3.3 类型转换 ### 3.3 类型转换
@ -172,11 +189,14 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实
"hbaseConfig": { "hbaseConfig": {
"hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com" "hbase.zookeeper.quorum": "hb-proxy-xxx-002.hbase.rds.aliyuncs.com,hb-proxy-xxx-001.hbase.rds.aliyuncs.com,hb-proxy-xxx-003.hbase.rds.aliyuncs.com"
}, },
"schema": "TAG",
//填写要读取的phoenix的表名 //填写要读取的phoenix的表名
"table": "US_POPULATION", "table": "US_POPULATION",
//填写要读取的列名,不填读取所有列 //填写要读取的列名,不填读取所有列
"column": [ "column": [
] ],
//查询条件
"where": "id="
} }
}, },
"writer": { "writer": {
@ -204,7 +224,13 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实
* 必选:是 <br /> * 必选:是 <br />
* 默认值:无 <br /> * 默认值:无 <br />
* **schema**
* 描述编写Phoenix中的namespace该值设置为''
* 必选:是 <br />
* 默认值:无 <br />
* **table** * **table**
* 描述编写Phoenix中的表名,如果有namespace该值设置为'namespace.tablename' * 描述编写Phoenix中的表名,如果有namespace该值设置为'namespace.tablename'
@ -220,7 +246,13 @@ hbase11xsqlreader插件实现了从Phoenix(HBase SQL)读取数据。在底层实
* 必选:是 <br /> * 必选:是 <br />
* 默认值:无 <br /> * 默认值:无 <br />
* **where**
* 描述填写需要从phoenix表中读取条件判断。
* 可选:是 <br />
* 默认值:无 <br />
### 3.3 类型转换 ### 3.3 类型转换

View File

@ -26,9 +26,7 @@ import java.io.IOException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.ArrayList; import java.util.*;
import java.util.List;
import java.util.Map;
public class HbaseSQLHelper { public class HbaseSQLHelper {
@ -50,11 +48,15 @@ public class HbaseSQLHelper {
String zkUrl = readerConfig.getZkUrl(); String zkUrl = readerConfig.getZkUrl();
PhoenixConfigurationUtil.setInputClass(conf, PhoenixRecordWritable.class); PhoenixConfigurationUtil.setInputClass(conf, PhoenixRecordWritable.class);
PhoenixConfigurationUtil.setInputTableName(conf, table);
PhoenixConfigurationUtil.setInputTableName(conf, readerConfig.getSchema()+"."+table);
if (!columns.isEmpty()) { if (!columns.isEmpty()) {
PhoenixConfigurationUtil.setSelectColumnNames(conf, columns.toArray(new String[columns.size()])); PhoenixConfigurationUtil.setSelectColumnNames(conf, columns.toArray(new String[columns.size()]));
} }
if(Objects.nonNull(readerConfig.getWhere())){
PhoenixConfigurationUtil.setInputTableConditions(conf,readerConfig.getWhere());
}
PhoenixEmbeddedDriver.ConnectionInfo info = null; PhoenixEmbeddedDriver.ConnectionInfo info = null;
try { try {
info = PhoenixEmbeddedDriver.ConnectionInfo.create(zkUrl); info = PhoenixEmbeddedDriver.ConnectionInfo.create(zkUrl);
@ -67,15 +69,19 @@ public class HbaseSQLHelper {
conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, info.getPort()); conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, info.getPort());
if (info.getRootNode() != null) if (info.getRootNode() != null)
conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, info.getRootNode()); conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, info.getRootNode());
conf.set(Key.NAME_SPACE_MAPPING_ENABLED,"true");
conf.set(Key.SYSTEM_TABLES_TO_NAMESPACE,"true");
return conf; return conf;
} }
public static List<String> getPColumnNames(String connectionString, String tableName) throws SQLException { public static List<String> getPColumnNames(String connectionString, String tableName,String schema) throws SQLException {
Connection con = Properties pro = new Properties();
DriverManager.getConnection(connectionString); pro.put(Key.NAME_SPACE_MAPPING_ENABLED, true);
pro.put(Key.SYSTEM_TABLES_TO_NAMESPACE, true);
Connection con = DriverManager.getConnection(connectionString,pro);
PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class); PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class);
MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection); MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection);
PTable table = metaDataClient.updateCache("", tableName).getTable(); PTable table = metaDataClient.updateCache(schema, tableName).getTable();
List<String> columnNames = new ArrayList<String>(); List<String> columnNames = new ArrayList<String>();
for (PColumn pColumn : table.getColumns()) { for (PColumn pColumn : table.getColumns()) {
if (!pColumn.getName().getString().equals(SaltingUtil.SALTING_COLUMN_NAME)) if (!pColumn.getName().getString().equals(SaltingUtil.SALTING_COLUMN_NAME))

View File

@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory;
import java.sql.SQLException; import java.sql.SQLException;
import java.util.List; import java.util.List;
import java.util.StringJoiner;
public class HbaseSQLReaderConfig { public class HbaseSQLReaderConfig {
private final static Logger LOG = LoggerFactory.getLogger(HbaseSQLReaderConfig.class); private final static Logger LOG = LoggerFactory.getLogger(HbaseSQLReaderConfig.class);
@ -27,6 +28,9 @@ public class HbaseSQLReaderConfig {
private String tableName; private String tableName;
private List<String> columns; // 目的表的所有列的列名包括主键和非主键不包括时间列 private List<String> columns; // 目的表的所有列的列名包括主键和非主键不包括时间列
private String where;//条件
private String schema;//
/** /**
* @return 获取原始的datax配置 * @return 获取原始的datax配置
*/ */
@ -96,22 +100,27 @@ public class HbaseSQLReaderConfig {
} }
String zkQuorum = zkCfg.getFirst(); String zkQuorum = zkCfg.getFirst();
String znode = zkCfg.getSecond(); String znode = zkCfg.getSecond();
if (zkQuorum == null || zkQuorum.isEmpty()) { if (zkQuorum == null || zkQuorum.isEmpty()) {
throw DataXException.asDataXException( throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "HBase的hbase.zookeeper.quorum配置不能为空" ); HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "HBase的hbase.zookeeper.quorum配置不能为空" );
} }
// 生成sql使用的连接字符串 格式 jdbc:hbase:zk_quorum:2181:/znode_parent // 生成sql使用的连接字符串 格式 jdbc:hbase:zk_quorum:2181:/znode_parent
cfg.connectionString = "jdbc:phoenix:" + zkQuorum; StringBuilder connectionString=new StringBuilder("jdbc:phoenix:");
cfg.zkUrl = zkQuorum + ":2181"; connectionString.append(zkQuorum);
cfg.connectionString = connectionString.toString();
StringBuilder zkUrl =new StringBuilder(zkQuorum);
cfg.zkUrl = zkUrl.append(":2181").toString();
if (!znode.isEmpty()) { if (!znode.isEmpty()) {
cfg.connectionString += cfg.connectionString + ":" + znode; cfg.connectionString = connectionString.append(":").append(znode).toString();
cfg.zkUrl += cfg.zkUrl + ":" + znode; cfg.zkUrl=zkUrl.append(":").append(znode).toString();
} }
} }
private static void parseTableConfig(HbaseSQLReaderConfig cfg, Configuration dataxCfg) { private static void parseTableConfig(HbaseSQLReaderConfig cfg, Configuration dataxCfg) {
// 解析并检查表名 // 解析并检查表名
cfg.tableName = dataxCfg.getString(Key.TABLE); cfg.tableName = dataxCfg.getString(Key.TABLE);
cfg.schema = dataxCfg.getString(Key.SCHEMA);
if (cfg.tableName == null || cfg.tableName.isEmpty()) { if (cfg.tableName == null || cfg.tableName.isEmpty()) {
throw DataXException.asDataXException( throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "HBase的tableName配置不能为空,请检查并修改配置." ); HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "HBase的tableName配置不能为空,请检查并修改配置." );
@ -124,13 +133,14 @@ public class HbaseSQLReaderConfig {
HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "您配置的tableName含有非法字符{0},请检查您的配置."); HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "您配置的tableName含有非法字符{0},请检查您的配置.");
} else if (cfg.columns.isEmpty()) { } else if (cfg.columns.isEmpty()) {
try { try {
cfg.columns = HbaseSQLHelper.getPColumnNames(cfg.connectionString, cfg.tableName); cfg.columns = HbaseSQLHelper.getPColumnNames(cfg.connectionString, cfg.tableName,cfg.schema);
dataxCfg.set(Key.COLUMN, cfg.columns); dataxCfg.set(Key.COLUMN, cfg.columns);
} catch (SQLException e) { } catch (SQLException e) {
throw DataXException.asDataXException( throw DataXException.asDataXException(
HbaseSQLReaderErrorCode.GET_PHOENIX_COLUMN_ERROR, "HBase的columns配置不能为空,请添加目标表的列名配置." + e.getMessage(), e); HbaseSQLReaderErrorCode.GET_PHOENIX_COLUMN_ERROR, "HBase的columns配置不能为空,请添加目标表的列名配置." + e.getMessage(), e);
} }
} }
cfg.where=dataxCfg.getString(Key.WHERE);
} }
@Override @Override
@ -151,6 +161,8 @@ public class HbaseSQLReaderConfig {
ret.append(","); ret.append(",");
} }
ret.setLength(ret.length() - 1); ret.setLength(ret.length() - 1);
ret.append("[where=]").append(getWhere());
ret.append("[schema=]").append(getSchema());
ret.append("\n"); ret.append("\n");
return ret.toString(); return ret.toString();
@ -161,4 +173,20 @@ public class HbaseSQLReaderConfig {
*/ */
private HbaseSQLReaderConfig() { private HbaseSQLReaderConfig() {
} }
public String getWhere() {
return where;
}
public void setWhere(String where) {
this.where = where;
}
public String getSchema() {
return schema;
}
public void setSchema(String schema) {
this.schema = schema;
}
} }

View File

@ -19,10 +19,8 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.math.BigDecimal; import java.math.BigDecimal;
import java.sql.*; import java.sql.*;
import java.util.HashMap; import java.sql.Date;
import java.util.LinkedHashMap; import java.util.*;
import java.util.List;
import java.util.Map;
/** /**
* Created by admin on 1/3/18. * Created by admin on 1/3/18.
@ -42,11 +40,14 @@ public class HbaseSQLReaderTask {
} }
private void getPColumns() throws SQLException { private void getPColumns() throws SQLException {
Properties pro = new Properties();
pro.put(Key.NAME_SPACE_MAPPING_ENABLED, true);
pro.put(Key.SYSTEM_TABLES_TO_NAMESPACE, true);
Connection con = Connection con =
DriverManager.getConnection(this.readerConfig.getConnectionString()); DriverManager.getConnection(this.readerConfig.getConnectionString(),pro);
PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class); PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class);
MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection); MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection);
PTable table = metaDataClient.updateCache("", this.readerConfig.getTableName()).getTable(); PTable table = metaDataClient.updateCache(this.readerConfig.getSchema(), this.readerConfig.getTableName()).getTable();
List<String> columnNames = this.readerConfig.getColumns(); List<String> columnNames = this.readerConfig.getColumns();
for (PColumn pColumn : table.getColumns()) { for (PColumn pColumn : table.getColumns()) {
if (columnNames.contains(pColumn.getName().getString())) { if (columnNames.contains(pColumn.getName().getString())) {

View File

@ -24,5 +24,18 @@ public final class Key {
* 必选列配置 * 必选列配置
*/ */
public final static String COLUMN = "column"; public final static String COLUMN = "column";
/**
*
*/
public static final String WHERE = "where";
/**
* 可选Phoenix表所属schema默认为空
*/
public static final String SCHEMA = "schema";
public static final String NAME_SPACE_MAPPING_ENABLED = "phoenix.schema.isNamespaceMappingEnabled";
public static final String SYSTEM_TABLES_TO_NAMESPACE = "phoenix.schema.mapSystemTablesToNamespace";
} }