From 9d8fe30f6d8140415348941f218e18824bf0dc0b Mon Sep 17 00:00:00 2001 From: Administrator <1> Date: Wed, 16 Nov 2022 18:03:53 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E5=A4=8D=20Phoenix4=E7=89=88=E6=9C=AC?= =?UTF-8?q?=E5=90=8C=E6=AD=A5=E6=95=B0=E6=8D=AE=E6=B7=BB=E5=8A=A0where?= =?UTF-8?q?=E6=9D=A1=E4=BB=B6?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../hbase11xsqlreader/HbaseSQLHelper.java | 22 +++++++---- .../HbaseSQLReaderConfig.java | 38 ++++++++++++++++--- .../hbase11xsqlreader/HbaseSQLReaderTask.java | 13 ++++--- .../plugin/reader/hbase11xsqlreader/Key.java | 13 +++++++ 4 files changed, 67 insertions(+), 19 deletions(-) diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java index 5309d1d9..8c25fcc8 100644 --- a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLHelper.java @@ -26,9 +26,7 @@ import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; import java.sql.SQLException; -import java.util.ArrayList; -import java.util.List; -import java.util.Map; +import java.util.*; public class HbaseSQLHelper { @@ -50,11 +48,15 @@ public class HbaseSQLHelper { String zkUrl = readerConfig.getZkUrl(); PhoenixConfigurationUtil.setInputClass(conf, PhoenixRecordWritable.class); - PhoenixConfigurationUtil.setInputTableName(conf, table); + + PhoenixConfigurationUtil.setInputTableName(conf, readerConfig.getSchema()+"."+table); if (!columns.isEmpty()) { PhoenixConfigurationUtil.setSelectColumnNames(conf, columns.toArray(new String[columns.size()])); } + if(Objects.nonNull(readerConfig.getWhere())){ + PhoenixConfigurationUtil.setInputTableConditions(conf,readerConfig.getWhere()); + } PhoenixEmbeddedDriver.ConnectionInfo info = null; try { info = PhoenixEmbeddedDriver.ConnectionInfo.create(zkUrl); @@ -67,15 +69,19 @@ public class HbaseSQLHelper { conf.setInt(HConstants.ZOOKEEPER_CLIENT_PORT, info.getPort()); if (info.getRootNode() != null) conf.set(HConstants.ZOOKEEPER_ZNODE_PARENT, info.getRootNode()); + conf.set(Key.NAME_SPACE_MAPPING_ENABLED,"true"); + conf.set(Key.SYSTEM_TABLES_TO_NAMESPACE,"true"); return conf; } - public static List getPColumnNames(String connectionString, String tableName) throws SQLException { - Connection con = - DriverManager.getConnection(connectionString); + public static List getPColumnNames(String connectionString, String tableName,String schema) throws SQLException { + Properties pro = new Properties(); + pro.put(Key.NAME_SPACE_MAPPING_ENABLED, true); + pro.put(Key.SYSTEM_TABLES_TO_NAMESPACE, true); + Connection con = DriverManager.getConnection(connectionString,pro); PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class); MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection); - PTable table = metaDataClient.updateCache("", tableName).getTable(); + PTable table = metaDataClient.updateCache(schema, tableName).getTable(); List columnNames = new ArrayList(); for (PColumn pColumn : table.getColumns()) { if (!pColumn.getName().getString().equals(SaltingUtil.SALTING_COLUMN_NAME)) diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java index ab06f6e1..37060986 100644 --- a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderConfig.java @@ -9,6 +9,7 @@ import org.slf4j.LoggerFactory; import java.sql.SQLException; import java.util.List; +import java.util.StringJoiner; public class HbaseSQLReaderConfig { private final static Logger LOG = LoggerFactory.getLogger(HbaseSQLReaderConfig.class); @@ -27,6 +28,9 @@ public class HbaseSQLReaderConfig { private String tableName; private List columns; // 目的表的所有列的列名,包括主键和非主键,不包括时间列 + private String where;//条件 + + private String schema;// /** * @return 获取原始的datax配置 */ @@ -96,22 +100,27 @@ public class HbaseSQLReaderConfig { } String zkQuorum = zkCfg.getFirst(); String znode = zkCfg.getSecond(); + if (zkQuorum == null || zkQuorum.isEmpty()) { throw DataXException.asDataXException( HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "HBase的hbase.zookeeper.quorum配置不能为空" ); } // 生成sql使用的连接字符串, 格式: jdbc:hbase:zk_quorum:2181:/znode_parent - cfg.connectionString = "jdbc:phoenix:" + zkQuorum; - cfg.zkUrl = zkQuorum + ":2181"; + StringBuilder connectionString=new StringBuilder("jdbc:phoenix:"); + connectionString.append(zkQuorum); + cfg.connectionString = connectionString.toString(); + StringBuilder zkUrl =new StringBuilder(zkQuorum); + cfg.zkUrl = zkUrl.append(":2181").toString(); if (!znode.isEmpty()) { - cfg.connectionString += cfg.connectionString + ":" + znode; - cfg.zkUrl += cfg.zkUrl + ":" + znode; + cfg.connectionString = connectionString.append(":").append(znode).toString(); + cfg.zkUrl=zkUrl.append(":").append(znode).toString(); } } private static void parseTableConfig(HbaseSQLReaderConfig cfg, Configuration dataxCfg) { // 解析并检查表名 cfg.tableName = dataxCfg.getString(Key.TABLE); + cfg.schema = dataxCfg.getString(Key.SCHEMA); if (cfg.tableName == null || cfg.tableName.isEmpty()) { throw DataXException.asDataXException( HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "HBase的tableName配置不能为空,请检查并修改配置." ); @@ -124,13 +133,14 @@ public class HbaseSQLReaderConfig { HbaseSQLReaderErrorCode.ILLEGAL_VALUE, "您配置的tableName含有非法字符{0},请检查您的配置."); } else if (cfg.columns.isEmpty()) { try { - cfg.columns = HbaseSQLHelper.getPColumnNames(cfg.connectionString, cfg.tableName); + cfg.columns = HbaseSQLHelper.getPColumnNames(cfg.connectionString, cfg.tableName,cfg.schema); dataxCfg.set(Key.COLUMN, cfg.columns); } catch (SQLException e) { throw DataXException.asDataXException( HbaseSQLReaderErrorCode.GET_PHOENIX_COLUMN_ERROR, "HBase的columns配置不能为空,请添加目标表的列名配置." + e.getMessage(), e); } } + cfg.where=dataxCfg.getString(Key.WHERE); } @Override @@ -151,6 +161,8 @@ public class HbaseSQLReaderConfig { ret.append(","); } ret.setLength(ret.length() - 1); + ret.append("[where=]").append(getWhere()); + ret.append("[schema=]").append(getSchema()); ret.append("\n"); return ret.toString(); @@ -161,4 +173,20 @@ public class HbaseSQLReaderConfig { */ private HbaseSQLReaderConfig() { } + + public String getWhere() { + return where; + } + + public void setWhere(String where) { + this.where = where; + } + + public String getSchema() { + return schema; + } + + public void setSchema(String schema) { + this.schema = schema; + } } diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java index 1ca22c6f..461649d1 100644 --- a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/HbaseSQLReaderTask.java @@ -19,10 +19,8 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.math.BigDecimal; import java.sql.*; -import java.util.HashMap; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; +import java.sql.Date; +import java.util.*; /** * Created by admin on 1/3/18. @@ -42,11 +40,14 @@ public class HbaseSQLReaderTask { } private void getPColumns() throws SQLException { + Properties pro = new Properties(); + pro.put(Key.NAME_SPACE_MAPPING_ENABLED, true); + pro.put(Key.SYSTEM_TABLES_TO_NAMESPACE, true); Connection con = - DriverManager.getConnection(this.readerConfig.getConnectionString()); + DriverManager.getConnection(this.readerConfig.getConnectionString(),pro); PhoenixConnection phoenixConnection = con.unwrap(PhoenixConnection.class); MetaDataClient metaDataClient = new MetaDataClient(phoenixConnection); - PTable table = metaDataClient.updateCache("", this.readerConfig.getTableName()).getTable(); + PTable table = metaDataClient.updateCache(this.readerConfig.getSchema(), this.readerConfig.getTableName()).getTable(); List columnNames = this.readerConfig.getColumns(); for (PColumn pColumn : table.getColumns()) { if (columnNames.contains(pColumn.getName().getString())) { diff --git a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java index 7987d6c8..f8453add 100644 --- a/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java +++ b/hbase11xsqlreader/src/main/java/com/alibaba/datax/plugin/reader/hbase11xsqlreader/Key.java @@ -24,5 +24,18 @@ public final class Key { * 【必选】列配置 */ public final static String COLUMN = "column"; + /** + * + */ + public static final String WHERE = "where"; + + /** + * 【可选】Phoenix表所属schema,默认为空 + */ + public static final String SCHEMA = "schema"; + + public static final String NAME_SPACE_MAPPING_ENABLED = "phoenix.schema.isNamespaceMappingEnabled"; + + public static final String SYSTEM_TABLES_TO_NAMESPACE = "phoenix.schema.mapSystemTablesToNamespace"; }