Merge pull request #349 from qingsonggithub/master

hbase11xsqlwriter支持phoenix thinclient
This commit is contained in:
binaryWorld 2019-05-29 15:34:24 +08:00 committed by GitHub
commit 4b99fab875
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
8 changed files with 644 additions and 390 deletions

View File

@ -18,6 +18,8 @@
<phoenix.version>4.11.0-HBase-1.1</phoenix.version> <phoenix.version>4.11.0-HBase-1.1</phoenix.version>
<hadoop.version>2.7.1</hadoop.version> <hadoop.version>2.7.1</hadoop.version>
<commons-codec.version>1.8</commons-codec.version> <commons-codec.version>1.8</commons-codec.version>
<protobuf.version>3.2.0</protobuf.version>
<httpclient.version>4.4.1</httpclient.version>
</properties> </properties>
<dependencies> <dependencies>
@ -47,6 +49,11 @@
<artifactId>phoenix-core</artifactId> <artifactId>phoenix-core</artifactId>
<version>${phoenix.version}</version> <version>${phoenix.version}</version>
</dependency> </dependency>
<dependency>
<groupId>org.apache.phoenix</groupId>
<artifactId>phoenix-queryserver-client</artifactId>
<version>${phoenix.version}</version>
</dependency>
<dependency> <dependency>
<groupId>com.google.guava</groupId> <groupId>com.google.guava</groupId>
<artifactId>guava</artifactId> <artifactId>guava</artifactId>
@ -58,6 +65,21 @@
<version>${commons-codec.version}</version> <version>${commons-codec.version}</version>
</dependency> </dependency>
<!-- httpclient begin -->
<dependency>
<groupId>org.apache.httpcomponents</groupId>
<artifactId>httpclient</artifactId>
<version>${httpclient.version}</version>
</dependency>
<!-- httpclient end -->
<dependency>
<groupId>com.google.protobuf</groupId>
<artifactId>protobuf-java</artifactId>
<version>${protobuf.version}</version>
</dependency>
<!-- for test --> <!-- for test -->
<dependency> <dependency>
<groupId>junit</groupId> <groupId>junit</groupId>

View File

@ -8,6 +8,7 @@ public final class Constant {
public static final boolean DEFAULT_LAST_COLUMN_IS_VERSION = false; // 默认最后一列不是version列 public static final boolean DEFAULT_LAST_COLUMN_IS_VERSION = false; // 默认最后一列不是version列
public static final int DEFAULT_BATCH_ROW_COUNT = 256; // 默认一次写256行 public static final int DEFAULT_BATCH_ROW_COUNT = 256; // 默认一次写256行
public static final boolean DEFAULT_TRUNCATE = false; // 默认开始的时候不清空表 public static final boolean DEFAULT_TRUNCATE = false; // 默认开始的时候不清空表
public static final boolean DEFAULT_USE_THIN_CLIENT = false; // 默认不用thin客户端
public static final int TYPE_UNSIGNED_TINYINT = 11; public static final int TYPE_UNSIGNED_TINYINT = 11;
public static final int TYPE_UNSIGNED_SMALLINT = 13; public static final int TYPE_UNSIGNED_SMALLINT = 13;

View File

@ -11,6 +11,7 @@ import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.ColumnNotFoundException; import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.MetaDataClient; import org.apache.phoenix.schema.MetaDataClient;
import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.types.PDataType;
import org.apache.phoenix.util.SchemaUtil; import org.apache.phoenix.util.SchemaUtil;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -18,7 +19,11 @@ import org.slf4j.LoggerFactory;
import java.io.IOException; import java.io.IOException;
import java.sql.Connection; import java.sql.Connection;
import java.sql.DriverManager; import java.sql.DriverManager;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException; import java.sql.SQLException;
import java.sql.Statement;
import java.util.HashMap;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
@ -28,6 +33,8 @@ import java.util.Map;
public class HbaseSQLHelper { public class HbaseSQLHelper {
private static final Logger LOG = LoggerFactory.getLogger(HbaseSQLHelper.class); private static final Logger LOG = LoggerFactory.getLogger(HbaseSQLHelper.class);
public static ThinClientPTable ptable;
/** /**
* 将datax的配置解析成sql writer的配置 * 将datax的配置解析成sql writer的配置
*/ */
@ -53,6 +60,11 @@ public class HbaseSQLHelper {
return new Pair<String, String>(zkQuorum, znode); return new Pair<String, String>(zkQuorum, znode);
} }
public static Map<String, String> getThinConnectConfig(String hbaseCfgString) {
assert hbaseCfgString != null;
return JSON.parseObject(hbaseCfgString, new TypeReference<Map<String, String>>() {});
}
/** /**
* 校验配置 * 校验配置
*/ */
@ -61,12 +73,12 @@ public class HbaseSQLHelper {
Connection conn = getJdbcConnection(cfg); Connection conn = getJdbcConnection(cfg);
// 检查表:存在可用 // 检查表:存在可用
checkTable(conn, cfg.getTableName()); checkTable(conn, cfg.getNamespace(), cfg.getTableName(), cfg.isThinClient());
// 校验元数据配置中给出的列必须是目的表中已经存在的列 // 校验元数据配置中给出的列必须是目的表中已经存在的列
PTable schema = null; PTable schema = null;
try { try {
schema = getTableSchema(conn, cfg.getTableName()); schema = getTableSchema(conn, cfg.getNamespace(), cfg.getTableName(), cfg.isThinClient());
} catch (SQLException e) { } catch (SQLException e) {
throw DataXException.asDataXException(HbaseSQLWriterErrorCode.GET_HBASE_CONNECTION_ERROR, throw DataXException.asDataXException(HbaseSQLWriterErrorCode.GET_HBASE_CONNECTION_ERROR,
"无法获取目的表" + cfg.getTableName() + "的元数据信息表可能不是SQL表或表名配置错误请检查您的配置 或者 联系 HBase 管理员.", e); "无法获取目的表" + cfg.getTableName() + "的元数据信息表可能不是SQL表或表名配置错误请检查您的配置 或者 联系 HBase 管理员.", e);
@ -97,7 +109,11 @@ public class HbaseSQLHelper {
Connection conn; Connection conn;
try { try {
Class.forName("org.apache.phoenix.jdbc.PhoenixDriver"); Class.forName("org.apache.phoenix.jdbc.PhoenixDriver");
conn = DriverManager.getConnection(connStr); if (cfg.isThinClient()) {
conn = getThinClientJdbcConnection(cfg);
} else {
conn = DriverManager.getConnection(connStr);
}
conn.setAutoCommit(false); conn.setAutoCommit(false);
} catch (Throwable e) { } catch (Throwable e) {
throw DataXException.asDataXException(HbaseSQLWriterErrorCode.GET_HBASE_CONNECTION_ERROR, throw DataXException.asDataXException(HbaseSQLWriterErrorCode.GET_HBASE_CONNECTION_ERROR,
@ -107,6 +123,32 @@ public class HbaseSQLHelper {
return conn; return conn;
} }
/**
* 创建 thin client jdbc连接
* @param cfg
* @return
* @throws SQLException
*/
public static Connection getThinClientJdbcConnection(HbaseSQLWriterConfig cfg) throws SQLException {
String connStr = cfg.getConnectionString();
LOG.info("Connecting to HBase cluster [" + connStr + "] use thin client ...");
Connection conn = DriverManager.getConnection(connStr, cfg.getUsername(), cfg.getPassword());
String userNamespaceQuery = "use " + cfg.getNamespace();
Statement statement = null;
try {
statement = conn.createStatement();
statement.executeUpdate(userNamespaceQuery);
return conn;
} catch (Exception e) {
throw DataXException.asDataXException(HbaseSQLWriterErrorCode.GET_HBASE_CONNECTION_ERROR,
"无法连接配置的namespace, 请检查配置 或者 联系 HBase 管理员.", e);
} finally {
if (statement != null) {
statement.close();
}
}
}
/** /**
* 获取一张表的元数据信息 * 获取一张表的元数据信息
* @param conn hbsae sql的jdbc连接 * @param conn hbsae sql的jdbc连接
@ -121,6 +163,70 @@ public class HbaseSQLHelper {
return mdc.updateCache(schemaName, tableName).getTable(); return mdc.updateCache(schemaName, tableName).getTable();
} }
/**
* 获取一张表的元数据信息
* @param conn
* @param namespace
* @param fullTableName
* @param isThinClient 是否使用thin client
* @return 表的元数据
* @throws SQLException
*/
public static PTable getTableSchema(Connection conn, String namespace, String fullTableName, boolean isThinClient)
throws
SQLException {
LOG.info("Start to get table schema of namespace=" + namespace + " , fullTableName=" + fullTableName);
if (!isThinClient) {
return getTableSchema(conn, fullTableName);
} else {
if (ptable == null) {
ResultSet result = conn.getMetaData().getColumns(null, namespace, fullTableName, null);
try {
ThinClientPTable retTable = new ThinClientPTable();
retTable.setColTypeMap(parseColType(result));
ptable = retTable;
}finally {
if (result != null) {
result.close();
}
}
}
return ptable;
}
}
/**
* 解析字段
* @param rs
* @return
* @throws SQLException
*/
public static Map<String, ThinClientPTable.ThinClientPColumn> parseColType(ResultSet rs) throws SQLException {
Map<String, ThinClientPTable.ThinClientPColumn> cols = new HashMap<String, ThinClientPTable
.ThinClientPColumn>();
ResultSetMetaData md = rs.getMetaData();
int columnCount = md.getColumnCount();
while (rs.next()) {
String colName = null;
PDataType colType = null;
for (int i = 1; i <= columnCount; i++) {
if (md.getColumnLabel(i).equals("TYPE_NAME")) {
colType = PDataType.fromSqlTypeName((String) rs.getObject(i));
} else if (md.getColumnLabel(i).equals("COLUMN_NAME")) {
colName = (String) rs.getObject(i);
}
}
if (colType == null || colName == null) {
throw new SQLException("ColType or colName is null, colType : " + colType + " , colName : " + colName);
}
cols.put(colName, new ThinClientPTable.ThinClientPColumn(colName, colType));
}
return cols;
}
/** /**
* 清空表 * 清空表
*/ */
@ -148,6 +254,24 @@ public class HbaseSQLHelper {
} }
} }
/**
* 检查表
* @param conn
* @param namespace
* @param tableName
* @param isThinClient
* @throws DataXException
*/
public static void checkTable(Connection conn, String namespace, String tableName, boolean isThinClient)
throws DataXException {
if (!isThinClient) {
checkTable(conn, tableName);
} else {
//ignore check table when use thin client
}
}
/** /**
* 检查表表要存在enabled * 检查表表要存在enabled
*/ */

View File

@ -2,6 +2,7 @@ package com.alibaba.datax.plugin.writer.hbase11xsqlwriter;
import com.alibaba.datax.common.exception.DataXException; import com.alibaba.datax.common.exception.DataXException;
import com.alibaba.datax.common.util.Configuration; import com.alibaba.datax.common.util.Configuration;
import com.google.common.base.Strings;
import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.hbase.TableName; import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.util.Pair; import org.apache.hadoop.hbase.util.Pair;
@ -9,6 +10,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.List; import java.util.List;
import java.util.Map;
/** /**
* HBase SQL writer config * HBase SQL writer config
@ -30,6 +32,10 @@ public class HbaseSQLWriterConfig {
private NullModeType nullMode; private NullModeType nullMode;
private int batchSize; // 一次批量写入多少行 private int batchSize; // 一次批量写入多少行
private boolean truncate; // 导入开始前是否要清空目的表 private boolean truncate; // 导入开始前是否要清空目的表
private boolean isThinClient;
private String namespace;
private String username;
private String password;
/** /**
* @return 获取原始的datax配置 * @return 获取原始的datax配置
@ -81,6 +87,22 @@ public class HbaseSQLWriterConfig {
return truncate; return truncate;
} }
public boolean isThinClient() {
return isThinClient;
}
public String getNamespace() {
return namespace;
}
public String getPassword() {
return password;
}
public String getUsername() {
return username;
}
/** /**
* @param dataxCfg * @param dataxCfg
* @return * @return
@ -100,6 +122,7 @@ public class HbaseSQLWriterConfig {
cfg.nullMode = NullModeType.getByTypeName(dataxCfg.getString(Key.NULL_MODE, Constant.DEFAULT_NULL_MODE)); cfg.nullMode = NullModeType.getByTypeName(dataxCfg.getString(Key.NULL_MODE, Constant.DEFAULT_NULL_MODE));
cfg.batchSize = dataxCfg.getInt(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_ROW_COUNT); cfg.batchSize = dataxCfg.getInt(Key.BATCH_SIZE, Constant.DEFAULT_BATCH_ROW_COUNT);
cfg.truncate = dataxCfg.getBool(Key.TRUNCATE, Constant.DEFAULT_TRUNCATE); cfg.truncate = dataxCfg.getBool(Key.TRUNCATE, Constant.DEFAULT_TRUNCATE);
cfg.isThinClient = dataxCfg.getBool(Key.THIN_CLIENT, Constant.DEFAULT_USE_THIN_CLIENT);
// 4. 打印解析出来的配置 // 4. 打印解析出来的配置
LOG.info("HBase SQL writer config parsed:" + cfg.toString()); LOG.info("HBase SQL writer config parsed:" + cfg.toString());
@ -117,31 +140,52 @@ public class HbaseSQLWriterConfig {
"读 Hbase 时需要配置hbaseConfig其内容为 Hbase 连接信息,请联系 Hbase PE 获取该信息."); "读 Hbase 时需要配置hbaseConfig其内容为 Hbase 连接信息,请联系 Hbase PE 获取该信息.");
} }
// 解析zk服务器和znode信息
Pair<String, String> zkCfg; if (dataxCfg.getBool(Key.THIN_CLIENT, Constant.DEFAULT_USE_THIN_CLIENT)) {
try { Map<String, String> thinConnectConfig = HbaseSQLHelper.getThinConnectConfig(hbaseCfg);
zkCfg = HbaseSQLHelper.getHbaseConfig(hbaseCfg); String thinConnectStr = thinConnectConfig.get(Key.HBASE_THIN_CONNECT_URL);
} catch (Throwable t) { cfg.namespace = thinConnectConfig.get(Key.HBASE_THIN_CONNECT_NAMESPACE);
// 解析hbase配置错误 cfg.username = thinConnectConfig.get(Key.HBASE_THIN_CONNECT_USERNAME);
throw DataXException.asDataXException( cfg.password = thinConnectConfig.get(Key.HBASE_THIN_CONNECT_PASSWORD);
if (Strings.isNullOrEmpty(thinConnectStr)) {
throw DataXException.asDataXException(
HbaseSQLWriterErrorCode.ILLEGAL_VALUE,
"thinClient=true的轻客户端模式下HBase的hbase.thin.connect.url配置不能为空请联系HBase PE获取该信息.");
}
if (Strings.isNullOrEmpty(cfg.namespace) || Strings.isNullOrEmpty(cfg.username) || Strings
.isNullOrEmpty(cfg.password)) {
throw DataXException.asDataXException(HbaseSQLWriterErrorCode.ILLEGAL_VALUE,
"thinClient=true的轻客户端模式下HBase的hbase.thin.connect.namespce|username|password配置不能为空请联系HBase "
+ "PE获取该信息.");
}
cfg.connectionString = thinConnectStr;
} else {
// 解析zk服务器和znode信息
Pair<String, String> zkCfg;
try {
zkCfg = HbaseSQLHelper.getHbaseConfig(hbaseCfg);
} catch (Throwable t) {
// 解析hbase配置错误
throw DataXException.asDataXException(
HbaseSQLWriterErrorCode.REQUIRED_VALUE, HbaseSQLWriterErrorCode.REQUIRED_VALUE,
"解析hbaseConfig出错请确认您配置的hbaseConfig为合法的json数据格式内容正确."); "解析hbaseConfig出错请确认您配置的hbaseConfig为合法的json数据格式内容正确.");
} }
String zkQuorum = zkCfg.getFirst(); String zkQuorum = zkCfg.getFirst();
String znode = zkCfg.getSecond(); String znode = zkCfg.getSecond();
if (zkQuorum == null || zkQuorum.isEmpty()) { if (zkQuorum == null || zkQuorum.isEmpty()) {
throw DataXException.asDataXException( throw DataXException.asDataXException(
HbaseSQLWriterErrorCode.ILLEGAL_VALUE, HbaseSQLWriterErrorCode.ILLEGAL_VALUE,
"HBase的hbase.zookeeper.quorum配置不能为空请联系HBase PE获取该信息."); "HBase的hbase.zookeeper.quorum配置不能为空请联系HBase PE获取该信息.");
} }
if (znode == null || znode.isEmpty()) { if (znode == null || znode.isEmpty()) {
throw DataXException.asDataXException( throw DataXException.asDataXException(
HbaseSQLWriterErrorCode.ILLEGAL_VALUE, HbaseSQLWriterErrorCode.ILLEGAL_VALUE,
"HBase的zookeeper.znode.parent配置不能为空请联系HBase PE获取该信息."); "HBase的zookeeper.znode.parent配置不能为空请联系HBase PE获取该信息.");
} }
// 生成sql使用的连接字符串 格式 jdbc:phoenix:zk_quorum:2181:/znode_parent // 生成sql使用的连接字符串 格式 jdbc:phoenix:zk_quorum:2181:/znode_parent
cfg.connectionString = "jdbc:phoenix:" + zkQuorum + ":2181:" + znode; cfg.connectionString = "jdbc:phoenix:" + zkQuorum + ":2181:" + znode;
}
} }
private static void parseTableConfig(HbaseSQLWriterConfig cfg, Configuration dataxCfg) { private static void parseTableConfig(HbaseSQLWriterConfig cfg, Configuration dataxCfg) {

View File

@ -157,12 +157,20 @@ public class HbaseSQLWriterTask {
private PreparedStatement createPreparedStatement() throws SQLException { private PreparedStatement createPreparedStatement() throws SQLException {
// 生成列名集合列之间用逗号分隔 col1,col2,col3,... // 生成列名集合列之间用逗号分隔 col1,col2,col3,...
StringBuilder columnNamesBuilder = new StringBuilder(); StringBuilder columnNamesBuilder = new StringBuilder();
for (String col : cfg.getColumns()) { if (cfg.isThinClient()) {
// 列名使用双引号则不自动转换为全大写而是保留用户配置的大小写 for (String col : cfg.getColumns()) {
columnNamesBuilder.append("\""); // thin 客户端不使用双引号
columnNamesBuilder.append(col); columnNamesBuilder.append(col);
columnNamesBuilder.append("\""); columnNamesBuilder.append(",");
columnNamesBuilder.append(","); }
} else {
for (String col : cfg.getColumns()) {
// 列名使用双引号则不自动转换为全大写而是保留用户配置的大小写
columnNamesBuilder.append("\"");
columnNamesBuilder.append(col);
columnNamesBuilder.append("\"");
columnNamesBuilder.append(",");
}
} }
columnNamesBuilder.setLength(columnNamesBuilder.length() - 1); // 移除末尾多余的逗号 columnNamesBuilder.setLength(columnNamesBuilder.length() - 1); // 移除末尾多余的逗号
String columnNames = columnNamesBuilder.toString(); String columnNames = columnNamesBuilder.toString();
@ -171,9 +179,13 @@ public class HbaseSQLWriterTask {
// 生成UPSERT模板 // 生成UPSERT模板
String tableName = cfg.getTableName(); String tableName = cfg.getTableName();
// 表名使用双引号则不自动转换为全大写而是保留用户配置的大小写 StringBuilder upsertBuilder = null;
StringBuilder upsertBuilder = if (cfg.isThinClient()) {
new StringBuilder("upsert into \"" + tableName + "\" (" + columnNames + " ) values ("); upsertBuilder = new StringBuilder("upsert into " + tableName + " (" + columnNames + " ) values (");
} else {
// 表名使用双引号则不自动转换为全大写而是保留用户配置的大小写
upsertBuilder = new StringBuilder("upsert into \"" + tableName + "\" (" + columnNames + " ) values (");
}
for (int i = 0; i < cfg.getColumns().size(); i++) { for (int i = 0; i < cfg.getColumns().size(); i++) {
upsertBuilder.append("?,"); upsertBuilder.append("?,");
} }
@ -191,7 +203,8 @@ public class HbaseSQLWriterTask {
*/ */
private int[] getColumnSqlType(List<String> columnNames) throws SQLException { private int[] getColumnSqlType(List<String> columnNames) throws SQLException {
int[] types = new int[numberOfColumnsToWrite]; int[] types = new int[numberOfColumnsToWrite];
PTable ptable = HbaseSQLHelper.getTableSchema(connection, cfg.getTableName()); PTable ptable = HbaseSQLHelper
.getTableSchema(connection, cfg.getNamespace(), cfg.getTableName(), cfg.isThinClient());
for (int i = 0; i < columnNames.size(); i++) { for (int i = 0; i < columnNames.size(); i++) {
String name = columnNames.get(i); String name = columnNames.get(i);

View File

@ -10,6 +10,10 @@ public final class Key {
public final static String HBASE_CONFIG = "hbaseConfig"; public final static String HBASE_CONFIG = "hbaseConfig";
public final static String HBASE_ZK_QUORUM = HConstants.ZOOKEEPER_QUORUM; public final static String HBASE_ZK_QUORUM = HConstants.ZOOKEEPER_QUORUM;
public final static String HBASE_ZNODE_PARENT = HConstants.ZOOKEEPER_ZNODE_PARENT; public final static String HBASE_ZNODE_PARENT = HConstants.ZOOKEEPER_ZNODE_PARENT;
public final static String HBASE_THIN_CONNECT_URL = "hbase.thin.connect.url";
public final static String HBASE_THIN_CONNECT_NAMESPACE = "hbase.thin.connect.namespace";
public final static String HBASE_THIN_CONNECT_USERNAME = "hbase.thin.connect.username";
public final static String HBASE_THIN_CONNECT_PASSWORD = "hbase.thin.connect.password";
/** /**
* 必选writer要写入的表的表名 * 必选writer要写入的表的表名
@ -34,6 +38,9 @@ public final class Key {
*/ */
public static final String TRUNCATE = "truncate"; public static final String TRUNCATE = "truncate";
public static final String THIN_CLIENT = "thinClient";
/** /**
* 可选批量写入的最大行数默认100行 * 可选批量写入的最大行数默认100行
*/ */

View File

@ -0,0 +1,402 @@
package com.alibaba.datax.plugin.writer.hbase11xsqlwriter;
import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
import org.apache.phoenix.index.IndexMaintainer;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.schema.AmbiguousColumnException;
import org.apache.phoenix.schema.ColumnFamilyNotFoundException;
import org.apache.phoenix.schema.ColumnNotFoundException;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnFamily;
import org.apache.phoenix.schema.PIndexState;
import org.apache.phoenix.schema.PName;
import org.apache.phoenix.schema.PRow;
import org.apache.phoenix.schema.PTable;
import org.apache.phoenix.schema.PTableKey;
import org.apache.phoenix.schema.PTableType;
import org.apache.phoenix.schema.RowKeySchema;
import org.apache.phoenix.schema.SortOrder;
import org.apache.phoenix.schema.types.PDataType;
import java.util.List;
import java.util.Map;
public class ThinClientPTable implements PTable {
private Map<String, ThinClientPColumn> colMap;
public void setColTypeMap(Map<String, ThinClientPColumn> colMap) {
this.colMap = colMap;
}
@Override
public long getTimeStamp() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public long getSequenceNumber() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public long getIndexDisableTimestamp() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PName getName() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PName getSchemaName() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PName getTableName() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PName getTenantId() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PTableType getType() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PName getPKName() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public List<PColumn> getPKColumns() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public List<PColumn> getColumns() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public List<PColumnFamily> getColumnFamilies() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PColumnFamily getColumnFamily(byte[] bytes) throws ColumnFamilyNotFoundException {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PColumnFamily getColumnFamily(String s) throws ColumnFamilyNotFoundException {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PColumn getColumnForColumnName(String colname) throws ColumnNotFoundException, AmbiguousColumnException {
if (!colMap.containsKey(colname)) {
throw new ColumnNotFoundException("Col " + colname + " not found");
}
return colMap.get(colname);
}
@Override
public PColumn getColumnForColumnQualifier(byte[] bytes, byte[] bytes1)
throws ColumnNotFoundException, AmbiguousColumnException {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PColumn getPKColumn(String s) throws ColumnNotFoundException {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PRow newRow(KeyValueBuilder keyValueBuilder, long l, ImmutableBytesWritable immutableBytesWritable, boolean b,
byte[]... bytes) {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PRow newRow(KeyValueBuilder keyValueBuilder, ImmutableBytesWritable immutableBytesWritable, boolean b,
byte[]... bytes) {
throw new UnsupportedOperationException("Not implement");
}
@Override
public int newKey(ImmutableBytesWritable immutableBytesWritable, byte[][] bytes) {
throw new UnsupportedOperationException("Not implement");
}
@Override
public RowKeySchema getRowKeySchema() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public Integer getBucketNum() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public List<PTable> getIndexes() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PIndexState getIndexState() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PName getParentName() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PName getParentTableName() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PName getParentSchemaName() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public List<PName> getPhysicalNames() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PName getPhysicalName() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public boolean isImmutableRows() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public boolean getIndexMaintainers(ImmutableBytesWritable immutableBytesWritable,
PhoenixConnection phoenixConnection) {
throw new UnsupportedOperationException("Not implement");
}
@Override
public IndexMaintainer getIndexMaintainer(PTable pTable, PhoenixConnection phoenixConnection) {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PName getDefaultFamilyName() {
return null;
}
@Override
public boolean isWALDisabled() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public boolean isMultiTenant() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public boolean getStoreNulls() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public boolean isTransactional() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public ViewType getViewType() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public String getViewStatement() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public Short getViewIndexId() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PTableKey getKey() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public IndexType getIndexType() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public int getBaseColumnCount() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public boolean rowKeyOrderOptimizable() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public int getRowTimestampColPos() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public long getUpdateCacheFrequency() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public boolean isNamespaceMapped() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public String getAutoPartitionSeqName() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public boolean isAppendOnlySchema() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public ImmutableStorageScheme getImmutableStorageScheme() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public QualifierEncodingScheme getEncodingScheme() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public EncodedCQCounter getEncodedCQCounter() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public boolean useStatsForParallelization() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public int getEstimatedSize() {
throw new UnsupportedOperationException("Not implement");
}
public static class ThinClientPColumn implements PColumn {
private String colName;
private PDataType pDataType;
public ThinClientPColumn(String colName, PDataType pDataType) {
this.colName = colName;
this.pDataType = pDataType;
}
@Override
public PName getName() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PName getFamilyName() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public int getPosition() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public Integer getArraySize() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public byte[] getViewConstant() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public boolean isViewReferenced() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public int getEstimatedSize() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public String getExpressionStr() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public boolean isRowTimestamp() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public boolean isDynamic() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public byte[] getColumnQualifierBytes() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public boolean isNullable() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public PDataType getDataType() {
return pDataType;
}
@Override
public Integer getMaxLength() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public Integer getScale() {
throw new UnsupportedOperationException("Not implement");
}
@Override
public SortOrder getSortOrder() {
throw new UnsupportedOperationException("Not implement");
}
}
}

View File

@ -1,359 +0,0 @@
package com.alibaba.datax.plugin.rdbms.util;
import java.util.HashSet;
import java.util.LinkedList;
import java.util.Set;
import java.util.StringTokenizer;
// TODO delete it
public class SqlFormatUtil {
private static final Set<String> BEGIN_CLAUSES = new HashSet<String>();
private static final Set<String> END_CLAUSES = new HashSet<String>();
private static final Set<String> LOGICAL = new HashSet<String>();
private static final Set<String> QUANTIFIERS = new HashSet<String>();
private static final Set<String> DML = new HashSet<String>();
private static final Set<String> MISC = new HashSet<String>();
private static final String WHITESPACE = " \n\r\f\t";
static {
BEGIN_CLAUSES.add("left");
BEGIN_CLAUSES.add("right");
BEGIN_CLAUSES.add("inner");
BEGIN_CLAUSES.add("outer");
BEGIN_CLAUSES.add("group");
BEGIN_CLAUSES.add("order");
END_CLAUSES.add("where");
END_CLAUSES.add("set");
END_CLAUSES.add("having");
END_CLAUSES.add("join");
END_CLAUSES.add("from");
END_CLAUSES.add("by");
END_CLAUSES.add("join");
END_CLAUSES.add("into");
END_CLAUSES.add("union");
LOGICAL.add("and");
LOGICAL.add("or");
LOGICAL.add("when");
LOGICAL.add("else");
LOGICAL.add("end");
QUANTIFIERS.add("in");
QUANTIFIERS.add("all");
QUANTIFIERS.add("exists");
QUANTIFIERS.add("some");
QUANTIFIERS.add("any");
DML.add("insert");
DML.add("update");
DML.add("delete");
MISC.add("select");
MISC.add("on");
}
static final String indentString = " ";
static final String initial = "\n ";
public static String format(String source) {
return new FormatProcess(source).perform();
}
private static class FormatProcess {
boolean beginLine = true;
boolean afterBeginBeforeEnd = false;
boolean afterByOrSetOrFromOrSelect = false;
boolean afterValues = false;
boolean afterOn = false;
boolean afterBetween = false;
boolean afterInsert = false;
int inFunction = 0;
int parensSinceSelect = 0;
private LinkedList<Integer> parenCounts = new LinkedList<Integer>();
private LinkedList<Boolean> afterByOrFromOrSelects = new LinkedList<Boolean>();
int indent = 1;
StringBuilder result = new StringBuilder();
StringTokenizer tokens;
String lastToken;
String token;
String lcToken;
public FormatProcess(String sql) {
tokens = new StringTokenizer(sql, "()+*/-=<>'`\"[]," + WHITESPACE,
true);
}
public String perform() {
result.append(initial);
while (tokens.hasMoreTokens()) {
token = tokens.nextToken();
lcToken = token.toLowerCase();
if ("'".equals(token)) {
String t;
do {
t = tokens.nextToken();
token += t;
} while (!"'".equals(t) && tokens.hasMoreTokens()); // cannot
// handle
// single
// quotes
} else if ("\"".equals(token)) {
String t;
do {
t = tokens.nextToken();
token += t;
} while (!"\"".equals(t));
}
if (afterByOrSetOrFromOrSelect && ",".equals(token)) {
commaAfterByOrFromOrSelect();
} else if (afterOn && ",".equals(token)) {
commaAfterOn();
}
else if ("(".equals(token)) {
openParen();
} else if (")".equals(token)) {
closeParen();
}
else if (BEGIN_CLAUSES.contains(lcToken)) {
beginNewClause();
}
else if (END_CLAUSES.contains(lcToken)) {
endNewClause();
}
else if ("select".equals(lcToken)) {
select();
}
else if (DML.contains(lcToken)) {
updateOrInsertOrDelete();
}
else if ("values".equals(lcToken)) {
values();
}
else if ("on".equals(lcToken)) {
on();
}
else if (afterBetween && lcToken.equals("and")) {
misc();
afterBetween = false;
}
else if (LOGICAL.contains(lcToken)) {
logical();
}
else if (isWhitespace(token)) {
white();
}
else {
misc();
}
if (!isWhitespace(token)) {
lastToken = lcToken;
}
}
return result.toString();
}
private void commaAfterOn() {
out();
indent--;
newline();
afterOn = false;
afterByOrSetOrFromOrSelect = true;
}
private void commaAfterByOrFromOrSelect() {
out();
newline();
}
private void logical() {
if ("end".equals(lcToken)) {
indent--;
}
newline();
out();
beginLine = false;
}
private void on() {
indent++;
afterOn = true;
newline();
out();
beginLine = false;
}
private void misc() {
out();
if ("between".equals(lcToken)) {
afterBetween = true;
}
if (afterInsert) {
newline();
afterInsert = false;
} else {
beginLine = false;
if ("case".equals(lcToken)) {
indent++;
}
}
}
private void white() {
if (!beginLine) {
result.append(" ");
}
}
private void updateOrInsertOrDelete() {
out();
indent++;
beginLine = false;
if ("update".equals(lcToken)) {
newline();
}
if ("insert".equals(lcToken)) {
afterInsert = true;
}
}
private void select() {
out();
indent++;
newline();
parenCounts.addLast(Integer.valueOf(parensSinceSelect));
afterByOrFromOrSelects.addLast(Boolean
.valueOf(afterByOrSetOrFromOrSelect));
parensSinceSelect = 0;
afterByOrSetOrFromOrSelect = true;
}
private void out() {
result.append(token);
}
private void endNewClause() {
if (!afterBeginBeforeEnd) {
indent--;
if (afterOn) {
indent--;
afterOn = false;
}
newline();
}
out();
if (!"union".equals(lcToken)) {
indent++;
}
newline();
afterBeginBeforeEnd = false;
afterByOrSetOrFromOrSelect = "by".equals(lcToken)
|| "set".equals(lcToken) || "from".equals(lcToken);
}
private void beginNewClause() {
if (!afterBeginBeforeEnd) {
if (afterOn) {
indent--;
afterOn = false;
}
indent--;
newline();
}
out();
beginLine = false;
afterBeginBeforeEnd = true;
}
private void values() {
indent--;
newline();
out();
indent++;
newline();
afterValues = true;
}
private void closeParen() {
parensSinceSelect--;
if (parensSinceSelect < 0) {
indent--;
parensSinceSelect = parenCounts.removeLast().intValue();
afterByOrSetOrFromOrSelect = afterByOrFromOrSelects
.removeLast().booleanValue();
}
if (inFunction > 0) {
inFunction--;
out();
} else {
if (!afterByOrSetOrFromOrSelect) {
indent--;
newline();
}
out();
}
beginLine = false;
}
private void openParen() {
if (isFunctionName(lastToken) || inFunction > 0) {
inFunction++;
}
beginLine = false;
if (inFunction > 0) {
out();
} else {
out();
if (!afterByOrSetOrFromOrSelect) {
indent++;
newline();
beginLine = true;
}
}
parensSinceSelect++;
}
private static boolean isFunctionName(String tok) {
final char begin = tok.charAt(0);
final boolean isIdentifier = Character.isJavaIdentifierStart(begin)
|| '"' == begin;
return isIdentifier && !LOGICAL.contains(tok)
&& !END_CLAUSES.contains(tok) && !QUANTIFIERS.contains(tok)
&& !DML.contains(tok) && !MISC.contains(tok);
}
private static boolean isWhitespace(String token) {
return WHITESPACE.indexOf(token) >= 0;
}
private void newline() {
result.append("\n");
for (int i = 0; i < indent; i++) {
result.append(indentString);
}
beginLine = true;
}
}
}