From ca32cbd4790e3c0ac17e240ddb4a82e786601fac Mon Sep 17 00:00:00 2001 From: "qingdao.gqs" Date: Wed, 29 May 2019 15:20:51 +0800 Subject: [PATCH] =?UTF-8?q?hbase11xsqlwriter=E6=94=AF=E6=8C=81phoenix=20th?= =?UTF-8?q?inclient?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- hbase11xsqlwriter/pom.xml | 22 + .../writer/hbase11xsqlwriter/Constant.java | 1 + .../hbase11xsqlwriter/HbaseSQLHelper.java | 130 +++++- .../HbaseSQLWriterConfig.java | 80 +++- .../hbase11xsqlwriter/HbaseSQLWriterTask.java | 33 +- .../plugin/writer/hbase11xsqlwriter/Key.java | 7 + .../hbase11xsqlwriter/ThinClientPTable.java | 402 ++++++++++++++++++ .../plugin/rdbms/util/SqlFormatUtil.java | 359 ---------------- 8 files changed, 644 insertions(+), 390 deletions(-) create mode 100644 hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/ThinClientPTable.java delete mode 100755 plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/SqlFormatUtil.java diff --git a/hbase11xsqlwriter/pom.xml b/hbase11xsqlwriter/pom.xml index 0b8a2d51..3b75167e 100644 --- a/hbase11xsqlwriter/pom.xml +++ b/hbase11xsqlwriter/pom.xml @@ -18,6 +18,8 @@ 4.11.0-HBase-1.1 2.7.1 1.8 + 3.2.0 + 4.4.1 @@ -47,6 +49,11 @@ phoenix-core ${phoenix.version} + + org.apache.phoenix + phoenix-queryserver-client + ${phoenix.version} + com.google.guava guava @@ -58,6 +65,21 @@ ${commons-codec.version} + + + org.apache.httpcomponents + httpclient + ${httpclient.version} + + + + + com.google.protobuf + protobuf-java + ${protobuf.version} + + + junit diff --git a/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/Constant.java b/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/Constant.java index d45d30e1..5812655d 100755 --- a/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/Constant.java +++ b/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/Constant.java @@ -8,6 +8,7 @@ public final class Constant { public static final boolean DEFAULT_LAST_COLUMN_IS_VERSION = false; // 默认最后一列不是version列 public static final int DEFAULT_BATCH_ROW_COUNT = 256; // 默认一次写256行 public static final boolean DEFAULT_TRUNCATE = false; // 默认开始的时候不清空表 + public static final boolean DEFAULT_USE_THIN_CLIENT = false; // 默认不用thin客户端 public static final int TYPE_UNSIGNED_TINYINT = 11; public static final int TYPE_UNSIGNED_SMALLINT = 13; diff --git a/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLHelper.java b/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLHelper.java index 6146ac8d..41e57d4e 100644 --- a/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLHelper.java +++ b/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLHelper.java @@ -11,6 +11,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection; import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.types.PDataType; import org.apache.phoenix.util.SchemaUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -18,7 +19,11 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.sql.Connection; import java.sql.DriverManager; +import java.sql.ResultSet; +import java.sql.ResultSetMetaData; import java.sql.SQLException; +import java.sql.Statement; +import java.util.HashMap; import java.util.List; import java.util.Map; @@ -28,6 +33,8 @@ import java.util.Map; public class HbaseSQLHelper { private static final Logger LOG = LoggerFactory.getLogger(HbaseSQLHelper.class); + public static ThinClientPTable ptable; + /** * 将datax的配置解析成sql writer的配置 */ @@ -53,6 +60,11 @@ public class HbaseSQLHelper { return new Pair(zkQuorum, znode); } + public static Map getThinConnectConfig(String hbaseCfgString) { + assert hbaseCfgString != null; + return JSON.parseObject(hbaseCfgString, new TypeReference>() {}); + } + /** * 校验配置 */ @@ -61,12 +73,12 @@ public class HbaseSQLHelper { Connection conn = getJdbcConnection(cfg); // 检查表:存在,可用 - checkTable(conn, cfg.getTableName()); + checkTable(conn, cfg.getNamespace(), cfg.getTableName(), cfg.isThinClient()); // 校验元数据:配置中给出的列必须是目的表中已经存在的列 PTable schema = null; try { - schema = getTableSchema(conn, cfg.getTableName()); + schema = getTableSchema(conn, cfg.getNamespace(), cfg.getTableName(), cfg.isThinClient()); } catch (SQLException e) { throw DataXException.asDataXException(HbaseSQLWriterErrorCode.GET_HBASE_CONNECTION_ERROR, "无法获取目的表" + cfg.getTableName() + "的元数据信息,表可能不是SQL表或表名配置错误,请检查您的配置 或者 联系 HBase 管理员.", e); @@ -97,7 +109,11 @@ public class HbaseSQLHelper { Connection conn; try { Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); - conn = DriverManager.getConnection(connStr); + if (cfg.isThinClient()) { + conn = getThinClientJdbcConnection(cfg); + } else { + conn = DriverManager.getConnection(connStr); + } conn.setAutoCommit(false); } catch (Throwable e) { throw DataXException.asDataXException(HbaseSQLWriterErrorCode.GET_HBASE_CONNECTION_ERROR, @@ -107,6 +123,32 @@ public class HbaseSQLHelper { return conn; } + /** + * 创建 thin client jdbc连接 + * @param cfg + * @return + * @throws SQLException + */ + public static Connection getThinClientJdbcConnection(HbaseSQLWriterConfig cfg) throws SQLException { + String connStr = cfg.getConnectionString(); + LOG.info("Connecting to HBase cluster [" + connStr + "] use thin client ..."); + Connection conn = DriverManager.getConnection(connStr, cfg.getUsername(), cfg.getPassword()); + String userNamespaceQuery = "use " + cfg.getNamespace(); + Statement statement = null; + try { + statement = conn.createStatement(); + statement.executeUpdate(userNamespaceQuery); + return conn; + } catch (Exception e) { + throw DataXException.asDataXException(HbaseSQLWriterErrorCode.GET_HBASE_CONNECTION_ERROR, + "无法连接配置的namespace, 请检查配置 或者 联系 HBase 管理员.", e); + } finally { + if (statement != null) { + statement.close(); + } + } + } + /** * 获取一张表的元数据信息 * @param conn hbsae sql的jdbc连接 @@ -121,6 +163,70 @@ public class HbaseSQLHelper { return mdc.updateCache(schemaName, tableName).getTable(); } + /** + * 获取一张表的元数据信息 + * @param conn + * @param namespace + * @param fullTableName + * @param isThinClient 是否使用thin client + * @return 表的元数据 + * @throws SQLException + */ + public static PTable getTableSchema(Connection conn, String namespace, String fullTableName, boolean isThinClient) + throws + SQLException { + LOG.info("Start to get table schema of namespace=" + namespace + " , fullTableName=" + fullTableName); + if (!isThinClient) { + return getTableSchema(conn, fullTableName); + } else { + if (ptable == null) { + ResultSet result = conn.getMetaData().getColumns(null, namespace, fullTableName, null); + try { + ThinClientPTable retTable = new ThinClientPTable(); + retTable.setColTypeMap(parseColType(result)); + ptable = retTable; + }finally { + if (result != null) { + result.close(); + } + } + } + return ptable; + } + + } + + /** + * 解析字段 + * @param rs + * @return + * @throws SQLException + */ + public static Map parseColType(ResultSet rs) throws SQLException { + Map cols = new HashMap(); + ResultSetMetaData md = rs.getMetaData(); + int columnCount = md.getColumnCount(); + + while (rs.next()) { + String colName = null; + PDataType colType = null; + for (int i = 1; i <= columnCount; i++) { + if (md.getColumnLabel(i).equals("TYPE_NAME")) { + colType = PDataType.fromSqlTypeName((String) rs.getObject(i)); + } else if (md.getColumnLabel(i).equals("COLUMN_NAME")) { + colName = (String) rs.getObject(i); + } + } + if (colType == null || colName == null) { + throw new SQLException("ColType or colName is null, colType : " + colType + " , colName : " + colName); + } + cols.put(colName, new ThinClientPTable.ThinClientPColumn(colName, colType)); + } + return cols; + } + + /** * 清空表 */ @@ -148,6 +254,24 @@ public class HbaseSQLHelper { } } + /** + * 检查表 + * @param conn + * @param namespace + * @param tableName + * @param isThinClient + * @throws DataXException + */ + public static void checkTable(Connection conn, String namespace, String tableName, boolean isThinClient) + throws DataXException { + if (!isThinClient) { + checkTable(conn, tableName); + } else { + //ignore check table when use thin client + } + } + + /** * 检查表:表要存在,enabled */ diff --git a/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLWriterConfig.java b/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLWriterConfig.java index ce8561fe..38ca58a9 100644 --- a/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLWriterConfig.java +++ b/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLWriterConfig.java @@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.hbase11xsqlwriter; import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.util.Configuration; +import com.google.common.base.Strings; import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.util.Pair; @@ -9,6 +10,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.List; +import java.util.Map; /** * HBase SQL writer config @@ -30,6 +32,10 @@ public class HbaseSQLWriterConfig { private NullModeType nullMode; private int batchSize; // 一次批量写入多少行 private boolean truncate; // 导入开始前是否要清空目的表 + private boolean isThinClient; + private String namespace; + private String username; + private String password; /** * @return 获取原始的datax配置 @@ -81,6 +87,22 @@ public class HbaseSQLWriterConfig { return truncate; } + public boolean isThinClient() { + return isThinClient; + } + + public String getNamespace() { + return namespace; + } + + public String getPassword() { + return password; + } + + public String getUsername() { + return username; + } + /** * @param dataxCfg * @return @@ -100,6 +122,7 @@ public class HbaseSQLWriterConfig { cfg.nullMode = NullModeType.getByTypeName(dataxCfg.getString(Key.NULL_MODE, Constant.DEFAULT_NULL_MODE)); cfg.batchSize = dataxCfg.getInt(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_ROW_COUNT); cfg.truncate = dataxCfg.getBool(Key.TRUNCATE, Constant.DEFAULT_TRUNCATE); + cfg.isThinClient = dataxCfg.getBool(Key.THIN_CLIENT, Constant.DEFAULT_USE_THIN_CLIENT); // 4. 打印解析出来的配置 LOG.info("HBase SQL writer config parsed:" + cfg.toString()); @@ -117,31 +140,52 @@ public class HbaseSQLWriterConfig { "读 Hbase 时需要配置hbaseConfig,其内容为 Hbase 连接信息,请联系 Hbase PE 获取该信息."); } - // 解析zk服务器和znode信息 - Pair zkCfg; - try { - zkCfg = HbaseSQLHelper.getHbaseConfig(hbaseCfg); - } catch (Throwable t) { - // 解析hbase配置错误 - throw DataXException.asDataXException( + + if (dataxCfg.getBool(Key.THIN_CLIENT, Constant.DEFAULT_USE_THIN_CLIENT)) { + Map thinConnectConfig = HbaseSQLHelper.getThinConnectConfig(hbaseCfg); + String thinConnectStr = thinConnectConfig.get(Key.HBASE_THIN_CONNECT_URL); + cfg.namespace = thinConnectConfig.get(Key.HBASE_THIN_CONNECT_NAMESPACE); + cfg.username = thinConnectConfig.get(Key.HBASE_THIN_CONNECT_USERNAME); + cfg.password = thinConnectConfig.get(Key.HBASE_THIN_CONNECT_PASSWORD); + if (Strings.isNullOrEmpty(thinConnectStr)) { + throw DataXException.asDataXException( + HbaseSQLWriterErrorCode.ILLEGAL_VALUE, + "thinClient=true的轻客户端模式下HBase的hbase.thin.connect.url配置不能为空,请联系HBase PE获取该信息."); + } + if (Strings.isNullOrEmpty(cfg.namespace) || Strings.isNullOrEmpty(cfg.username) || Strings + .isNullOrEmpty(cfg.password)) { + throw DataXException.asDataXException(HbaseSQLWriterErrorCode.ILLEGAL_VALUE, + "thinClient=true的轻客户端模式下HBase的hbase.thin.connect.namespce|username|password配置不能为空,请联系HBase " + + "PE获取该信息."); + } + cfg.connectionString = thinConnectStr; + } else { + // 解析zk服务器和znode信息 + Pair zkCfg; + try { + zkCfg = HbaseSQLHelper.getHbaseConfig(hbaseCfg); + } catch (Throwable t) { + // 解析hbase配置错误 + throw DataXException.asDataXException( HbaseSQLWriterErrorCode.REQUIRED_VALUE, "解析hbaseConfig出错,请确认您配置的hbaseConfig为合法的json数据格式,内容正确."); - } - String zkQuorum = zkCfg.getFirst(); - String znode = zkCfg.getSecond(); - if (zkQuorum == null || zkQuorum.isEmpty()) { - throw DataXException.asDataXException( + } + String zkQuorum = zkCfg.getFirst(); + String znode = zkCfg.getSecond(); + if (zkQuorum == null || zkQuorum.isEmpty()) { + throw DataXException.asDataXException( HbaseSQLWriterErrorCode.ILLEGAL_VALUE, "HBase的hbase.zookeeper.quorum配置不能为空,请联系HBase PE获取该信息."); - } - if (znode == null || znode.isEmpty()) { - throw DataXException.asDataXException( + } + if (znode == null || znode.isEmpty()) { + throw DataXException.asDataXException( HbaseSQLWriterErrorCode.ILLEGAL_VALUE, "HBase的zookeeper.znode.parent配置不能为空,请联系HBase PE获取该信息."); - } + } - // 生成sql使用的连接字符串, 格式: jdbc:phoenix:zk_quorum:2181:/znode_parent - cfg.connectionString = "jdbc:phoenix:" + zkQuorum + ":2181:" + znode; + // 生成sql使用的连接字符串, 格式: jdbc:phoenix:zk_quorum:2181:/znode_parent + cfg.connectionString = "jdbc:phoenix:" + zkQuorum + ":2181:" + znode; + } } private static void parseTableConfig(HbaseSQLWriterConfig cfg, Configuration dataxCfg) { diff --git a/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLWriterTask.java b/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLWriterTask.java index 1b00ea3f..0e752b01 100644 --- a/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLWriterTask.java +++ b/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/HbaseSQLWriterTask.java @@ -157,12 +157,20 @@ public class HbaseSQLWriterTask { private PreparedStatement createPreparedStatement() throws SQLException { // 生成列名集合,列之间用逗号分隔: col1,col2,col3,... StringBuilder columnNamesBuilder = new StringBuilder(); - for (String col : cfg.getColumns()) { - // 列名使用双引号,则不自动转换为全大写,而是保留用户配置的大小写 - columnNamesBuilder.append("\""); - columnNamesBuilder.append(col); - columnNamesBuilder.append("\""); - columnNamesBuilder.append(","); + if (cfg.isThinClient()) { + for (String col : cfg.getColumns()) { + // thin 客户端不使用双引号 + columnNamesBuilder.append(col); + columnNamesBuilder.append(","); + } + } else { + for (String col : cfg.getColumns()) { + // 列名使用双引号,则不自动转换为全大写,而是保留用户配置的大小写 + columnNamesBuilder.append("\""); + columnNamesBuilder.append(col); + columnNamesBuilder.append("\""); + columnNamesBuilder.append(","); + } } columnNamesBuilder.setLength(columnNamesBuilder.length() - 1); // 移除末尾多余的逗号 String columnNames = columnNamesBuilder.toString(); @@ -171,9 +179,13 @@ public class HbaseSQLWriterTask { // 生成UPSERT模板 String tableName = cfg.getTableName(); - // 表名使用双引号,则不自动转换为全大写,而是保留用户配置的大小写 - StringBuilder upsertBuilder = - new StringBuilder("upsert into \"" + tableName + "\" (" + columnNames + " ) values ("); + StringBuilder upsertBuilder = null; + if (cfg.isThinClient()) { + upsertBuilder = new StringBuilder("upsert into " + tableName + " (" + columnNames + " ) values ("); + } else { + // 表名使用双引号,则不自动转换为全大写,而是保留用户配置的大小写 + upsertBuilder = new StringBuilder("upsert into \"" + tableName + "\" (" + columnNames + " ) values ("); + } for (int i = 0; i < cfg.getColumns().size(); i++) { upsertBuilder.append("?,"); } @@ -191,7 +203,8 @@ public class HbaseSQLWriterTask { */ private int[] getColumnSqlType(List columnNames) throws SQLException { int[] types = new int[numberOfColumnsToWrite]; - PTable ptable = HbaseSQLHelper.getTableSchema(connection, cfg.getTableName()); + PTable ptable = HbaseSQLHelper + .getTableSchema(connection, cfg.getNamespace(), cfg.getTableName(), cfg.isThinClient()); for (int i = 0; i < columnNames.size(); i++) { String name = columnNames.get(i); diff --git a/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/Key.java b/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/Key.java index 1b4f3816..131aba66 100755 --- a/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/Key.java +++ b/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/Key.java @@ -10,6 +10,10 @@ public final class Key { public final static String HBASE_CONFIG = "hbaseConfig"; public final static String HBASE_ZK_QUORUM = HConstants.ZOOKEEPER_QUORUM; public final static String HBASE_ZNODE_PARENT = HConstants.ZOOKEEPER_ZNODE_PARENT; + public final static String HBASE_THIN_CONNECT_URL = "hbase.thin.connect.url"; + public final static String HBASE_THIN_CONNECT_NAMESPACE = "hbase.thin.connect.namespace"; + public final static String HBASE_THIN_CONNECT_USERNAME = "hbase.thin.connect.username"; + public final static String HBASE_THIN_CONNECT_PASSWORD = "hbase.thin.connect.password"; /** * 【必选】writer要写入的表的表名 @@ -34,6 +38,9 @@ public final class Key { */ public static final String TRUNCATE = "truncate"; + + public static final String THIN_CLIENT = "thinClient"; + /** * 【可选】批量写入的最大行数,默认100行 */ diff --git a/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/ThinClientPTable.java b/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/ThinClientPTable.java new file mode 100644 index 00000000..49c2e061 --- /dev/null +++ b/hbase11xsqlwriter/src/main/java/com/alibaba/datax/plugin/writer/hbase11xsqlwriter/ThinClientPTable.java @@ -0,0 +1,402 @@ +package com.alibaba.datax.plugin.writer.hbase11xsqlwriter; + +import org.apache.hadoop.hbase.io.ImmutableBytesWritable; +import org.apache.phoenix.hbase.index.util.KeyValueBuilder; +import org.apache.phoenix.index.IndexMaintainer; +import org.apache.phoenix.jdbc.PhoenixConnection; +import org.apache.phoenix.schema.AmbiguousColumnException; +import org.apache.phoenix.schema.ColumnFamilyNotFoundException; +import org.apache.phoenix.schema.ColumnNotFoundException; +import org.apache.phoenix.schema.PColumn; +import org.apache.phoenix.schema.PColumnFamily; +import org.apache.phoenix.schema.PIndexState; +import org.apache.phoenix.schema.PName; +import org.apache.phoenix.schema.PRow; +import org.apache.phoenix.schema.PTable; +import org.apache.phoenix.schema.PTableKey; +import org.apache.phoenix.schema.PTableType; +import org.apache.phoenix.schema.RowKeySchema; +import org.apache.phoenix.schema.SortOrder; +import org.apache.phoenix.schema.types.PDataType; + +import java.util.List; +import java.util.Map; + +public class ThinClientPTable implements PTable { + + private Map colMap; + + public void setColTypeMap(Map colMap) { + this.colMap = colMap; + } + + @Override + public long getTimeStamp() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public long getSequenceNumber() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public long getIndexDisableTimestamp() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PName getName() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PName getSchemaName() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PName getTableName() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PName getTenantId() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PTableType getType() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PName getPKName() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public List getPKColumns() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public List getColumns() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public List getColumnFamilies() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PColumnFamily getColumnFamily(byte[] bytes) throws ColumnFamilyNotFoundException { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PColumnFamily getColumnFamily(String s) throws ColumnFamilyNotFoundException { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PColumn getColumnForColumnName(String colname) throws ColumnNotFoundException, AmbiguousColumnException { + if (!colMap.containsKey(colname)) { + throw new ColumnNotFoundException("Col " + colname + " not found"); + } + return colMap.get(colname); + } + + @Override + public PColumn getColumnForColumnQualifier(byte[] bytes, byte[] bytes1) + throws ColumnNotFoundException, AmbiguousColumnException { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PColumn getPKColumn(String s) throws ColumnNotFoundException { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PRow newRow(KeyValueBuilder keyValueBuilder, long l, ImmutableBytesWritable immutableBytesWritable, boolean b, + byte[]... bytes) { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PRow newRow(KeyValueBuilder keyValueBuilder, ImmutableBytesWritable immutableBytesWritable, boolean b, + byte[]... bytes) { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public int newKey(ImmutableBytesWritable immutableBytesWritable, byte[][] bytes) { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public RowKeySchema getRowKeySchema() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public Integer getBucketNum() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public List getIndexes() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PIndexState getIndexState() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PName getParentName() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PName getParentTableName() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PName getParentSchemaName() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public List getPhysicalNames() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PName getPhysicalName() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public boolean isImmutableRows() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public boolean getIndexMaintainers(ImmutableBytesWritable immutableBytesWritable, + PhoenixConnection phoenixConnection) { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public IndexMaintainer getIndexMaintainer(PTable pTable, PhoenixConnection phoenixConnection) { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PName getDefaultFamilyName() { + return null; + } + + @Override + public boolean isWALDisabled() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public boolean isMultiTenant() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public boolean getStoreNulls() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public boolean isTransactional() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public ViewType getViewType() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public String getViewStatement() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public Short getViewIndexId() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PTableKey getKey() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public IndexType getIndexType() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public int getBaseColumnCount() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public boolean rowKeyOrderOptimizable() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public int getRowTimestampColPos() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public long getUpdateCacheFrequency() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public boolean isNamespaceMapped() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public String getAutoPartitionSeqName() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public boolean isAppendOnlySchema() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public ImmutableStorageScheme getImmutableStorageScheme() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public QualifierEncodingScheme getEncodingScheme() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public EncodedCQCounter getEncodedCQCounter() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public boolean useStatsForParallelization() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public int getEstimatedSize() { + throw new UnsupportedOperationException("Not implement"); + } + + public static class ThinClientPColumn implements PColumn { + + private String colName; + + private PDataType pDataType; + + public ThinClientPColumn(String colName, PDataType pDataType) { + this.colName = colName; + this.pDataType = pDataType; + } + + @Override + public PName getName() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PName getFamilyName() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public int getPosition() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public Integer getArraySize() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public byte[] getViewConstant() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public boolean isViewReferenced() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public int getEstimatedSize() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public String getExpressionStr() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public boolean isRowTimestamp() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public boolean isDynamic() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public byte[] getColumnQualifierBytes() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public boolean isNullable() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public PDataType getDataType() { + return pDataType; + } + + @Override + public Integer getMaxLength() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public Integer getScale() { + throw new UnsupportedOperationException("Not implement"); + } + + @Override + public SortOrder getSortOrder() { + throw new UnsupportedOperationException("Not implement"); + } + } + +} diff --git a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/SqlFormatUtil.java b/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/SqlFormatUtil.java deleted file mode 100755 index 76137d31..00000000 --- a/plugin-rdbms-util/src/main/java/com/alibaba/datax/plugin/rdbms/util/SqlFormatUtil.java +++ /dev/null @@ -1,359 +0,0 @@ -package com.alibaba.datax.plugin.rdbms.util; - -import java.util.HashSet; -import java.util.LinkedList; -import java.util.Set; -import java.util.StringTokenizer; - -// TODO delete it -public class SqlFormatUtil { - - private static final Set BEGIN_CLAUSES = new HashSet(); - private static final Set END_CLAUSES = new HashSet(); - private static final Set LOGICAL = new HashSet(); - private static final Set QUANTIFIERS = new HashSet(); - private static final Set DML = new HashSet(); - private static final Set MISC = new HashSet(); - - private static final String WHITESPACE = " \n\r\f\t"; - - static { - BEGIN_CLAUSES.add("left"); - BEGIN_CLAUSES.add("right"); - BEGIN_CLAUSES.add("inner"); - BEGIN_CLAUSES.add("outer"); - BEGIN_CLAUSES.add("group"); - BEGIN_CLAUSES.add("order"); - - END_CLAUSES.add("where"); - END_CLAUSES.add("set"); - END_CLAUSES.add("having"); - END_CLAUSES.add("join"); - END_CLAUSES.add("from"); - END_CLAUSES.add("by"); - END_CLAUSES.add("join"); - END_CLAUSES.add("into"); - END_CLAUSES.add("union"); - - LOGICAL.add("and"); - LOGICAL.add("or"); - LOGICAL.add("when"); - LOGICAL.add("else"); - LOGICAL.add("end"); - - QUANTIFIERS.add("in"); - QUANTIFIERS.add("all"); - QUANTIFIERS.add("exists"); - QUANTIFIERS.add("some"); - QUANTIFIERS.add("any"); - - DML.add("insert"); - DML.add("update"); - DML.add("delete"); - - MISC.add("select"); - MISC.add("on"); - } - - static final String indentString = " "; - static final String initial = "\n "; - - public static String format(String source) { - return new FormatProcess(source).perform(); - } - - private static class FormatProcess { - boolean beginLine = true; - boolean afterBeginBeforeEnd = false; - boolean afterByOrSetOrFromOrSelect = false; - boolean afterValues = false; - boolean afterOn = false; - boolean afterBetween = false; - boolean afterInsert = false; - int inFunction = 0; - int parensSinceSelect = 0; - private LinkedList parenCounts = new LinkedList(); - private LinkedList afterByOrFromOrSelects = new LinkedList(); - - int indent = 1; - - StringBuilder result = new StringBuilder(); - StringTokenizer tokens; - String lastToken; - String token; - String lcToken; - - public FormatProcess(String sql) { - tokens = new StringTokenizer(sql, "()+*/-=<>'`\"[]," + WHITESPACE, - true); - } - - public String perform() { - - result.append(initial); - - while (tokens.hasMoreTokens()) { - token = tokens.nextToken(); - lcToken = token.toLowerCase(); - - if ("'".equals(token)) { - String t; - do { - t = tokens.nextToken(); - token += t; - } while (!"'".equals(t) && tokens.hasMoreTokens()); // cannot - // handle - // single - // quotes - } else if ("\"".equals(token)) { - String t; - do { - t = tokens.nextToken(); - token += t; - } while (!"\"".equals(t)); - } - - if (afterByOrSetOrFromOrSelect && ",".equals(token)) { - commaAfterByOrFromOrSelect(); - } else if (afterOn && ",".equals(token)) { - commaAfterOn(); - } - - else if ("(".equals(token)) { - openParen(); - } else if (")".equals(token)) { - closeParen(); - } - - else if (BEGIN_CLAUSES.contains(lcToken)) { - beginNewClause(); - } - - else if (END_CLAUSES.contains(lcToken)) { - endNewClause(); - } - - else if ("select".equals(lcToken)) { - select(); - } - - else if (DML.contains(lcToken)) { - updateOrInsertOrDelete(); - } - - else if ("values".equals(lcToken)) { - values(); - } - - else if ("on".equals(lcToken)) { - on(); - } - - else if (afterBetween && lcToken.equals("and")) { - misc(); - afterBetween = false; - } - - else if (LOGICAL.contains(lcToken)) { - logical(); - } - - else if (isWhitespace(token)) { - white(); - } - - else { - misc(); - } - - if (!isWhitespace(token)) { - lastToken = lcToken; - } - - } - return result.toString(); - } - - private void commaAfterOn() { - out(); - indent--; - newline(); - afterOn = false; - afterByOrSetOrFromOrSelect = true; - } - - private void commaAfterByOrFromOrSelect() { - out(); - newline(); - } - - private void logical() { - if ("end".equals(lcToken)) { - indent--; - } - newline(); - out(); - beginLine = false; - } - - private void on() { - indent++; - afterOn = true; - newline(); - out(); - beginLine = false; - } - - private void misc() { - out(); - if ("between".equals(lcToken)) { - afterBetween = true; - } - if (afterInsert) { - newline(); - afterInsert = false; - } else { - beginLine = false; - if ("case".equals(lcToken)) { - indent++; - } - } - } - - private void white() { - if (!beginLine) { - result.append(" "); - } - } - - private void updateOrInsertOrDelete() { - out(); - indent++; - beginLine = false; - if ("update".equals(lcToken)) { - newline(); - } - if ("insert".equals(lcToken)) { - afterInsert = true; - } - } - - private void select() { - out(); - indent++; - newline(); - parenCounts.addLast(Integer.valueOf(parensSinceSelect)); - afterByOrFromOrSelects.addLast(Boolean - .valueOf(afterByOrSetOrFromOrSelect)); - parensSinceSelect = 0; - afterByOrSetOrFromOrSelect = true; - } - - private void out() { - result.append(token); - } - - private void endNewClause() { - if (!afterBeginBeforeEnd) { - indent--; - if (afterOn) { - indent--; - afterOn = false; - } - newline(); - } - out(); - if (!"union".equals(lcToken)) { - indent++; - } - newline(); - afterBeginBeforeEnd = false; - afterByOrSetOrFromOrSelect = "by".equals(lcToken) - || "set".equals(lcToken) || "from".equals(lcToken); - } - - private void beginNewClause() { - if (!afterBeginBeforeEnd) { - if (afterOn) { - indent--; - afterOn = false; - } - indent--; - newline(); - } - out(); - beginLine = false; - afterBeginBeforeEnd = true; - } - - private void values() { - indent--; - newline(); - out(); - indent++; - newline(); - afterValues = true; - } - - private void closeParen() { - parensSinceSelect--; - if (parensSinceSelect < 0) { - indent--; - parensSinceSelect = parenCounts.removeLast().intValue(); - afterByOrSetOrFromOrSelect = afterByOrFromOrSelects - .removeLast().booleanValue(); - } - if (inFunction > 0) { - inFunction--; - out(); - } else { - if (!afterByOrSetOrFromOrSelect) { - indent--; - newline(); - } - out(); - } - beginLine = false; - } - - private void openParen() { - if (isFunctionName(lastToken) || inFunction > 0) { - inFunction++; - } - beginLine = false; - if (inFunction > 0) { - out(); - } else { - out(); - if (!afterByOrSetOrFromOrSelect) { - indent++; - newline(); - beginLine = true; - } - } - parensSinceSelect++; - } - - private static boolean isFunctionName(String tok) { - final char begin = tok.charAt(0); - final boolean isIdentifier = Character.isJavaIdentifierStart(begin) - || '"' == begin; - return isIdentifier && !LOGICAL.contains(tok) - && !END_CLAUSES.contains(tok) && !QUANTIFIERS.contains(tok) - && !DML.contains(tok) && !MISC.contains(tok); - } - - private static boolean isWhitespace(String token) { - return WHITESPACE.indexOf(token) >= 0; - } - - private void newline() { - result.append("\n"); - for (int i = 0; i < indent; i++) { - result.append(indentString); - } - beginLine = true; - } - } - -}