mirror of
https://github.com/apache/sqoop.git
synced 2025-05-04 08:50:50 +08:00
SQOOP-317. Allow working with tables owned by other users in Oracle.
git-svn-id: https://svn.apache.org/repos/asf/incubator/sqoop/trunk@1159491 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
6c6d37214f
commit
01a029949a
@ -68,12 +68,12 @@ public class OracleManager extends GenericJdbcManager {
|
||||
"SELECT USERNAME FROM DBA_USERS";
|
||||
|
||||
/**
|
||||
* Query to list all tables of the current schema. Even if the user has
|
||||
* DBA privileges which allows other schemas to be visible, we will limit this
|
||||
* query to only the current schema.
|
||||
* Query to list all tables visible to the current user. Note that this list
|
||||
* does not identify the table owners which is required in order to
|
||||
* ensure that the table can be operated on for import/export purposes.
|
||||
*/
|
||||
public static final String QUERY_LIST_TABLES =
|
||||
"SELECT TABLE_NAME FROM USER_TABLES";
|
||||
"SELECT TABLE_NAME FROM ALL_TABLES";
|
||||
|
||||
/**
|
||||
* Query to list all columns of the given table. Even if the user has the
|
||||
@ -81,18 +81,20 @@ public class OracleManager extends GenericJdbcManager {
|
||||
* limit it to explore tables only from within the active schema.
|
||||
*/
|
||||
public static final String QUERY_COLUMNS_FOR_TABLE =
|
||||
"SELECT COLUMN_NAME FROM USER_TAB_COLUMNS WHERE TABLE_NAME = ?";
|
||||
"SELECT COLUMN_NAME FROM ALL_TAB_COLUMNS WHERE "
|
||||
+ "OWNER = ? AND TABLE_NAME = ?";
|
||||
|
||||
/**
|
||||
* Query to find the primary key column name for a given table. This query
|
||||
* is restricted to the current schema.
|
||||
*/
|
||||
public static final String QUERY_PRIMARY_KEY_FOR_TABLE =
|
||||
"SELECT USER_CONS_COLUMNS.COLUMN_NAME FROM USER_CONS_COLUMNS, "
|
||||
+ "USER_CONSTRAINTS WHERE USER_CONS_COLUMNS.CONSTRAINT_NAME = "
|
||||
+ "USER_CONSTRAINTS.CONSTRAINT_NAME AND "
|
||||
+ "USER_CONSTRAINTS.CONSTRAINT_TYPE = 'P' AND "
|
||||
+ "USER_CONS_COLUMNS.TABLE_NAME = ?";
|
||||
"SELECT ALL_CONS_COLUMNS.COLUMN_NAME FROM ALL_CONS_COLUMNS, "
|
||||
+ "ALL_CONSTRAINTS WHERE ALL_CONS_COLUMNS.CONSTRAINT_NAME = "
|
||||
+ "ALL_CONSTRAINTS.CONSTRAINT_NAME AND "
|
||||
+ "ALL_CONSTRAINTS.CONSTRAINT_TYPE = 'P' AND "
|
||||
+ "ALL_CONS_COLUMNS.TABLE_NAME = ? AND "
|
||||
+ "ALL_CONS_COLUMNS.OWNER = ?";
|
||||
|
||||
// driver class to ensure is loaded when making db connection.
|
||||
private static final String DRIVER_CLASS = "oracle.jdbc.OracleDriver";
|
||||
@ -238,7 +240,11 @@ public void close() throws SQLException {
|
||||
|
||||
protected String getColNamesQuery(String tableName) {
|
||||
// SqlManager uses "tableName AS t" which doesn't work in Oracle.
|
||||
return "SELECT t.* FROM " + escapeTableName(tableName) + " t WHERE 1=0";
|
||||
String query = "SELECT t.* FROM " + escapeTableName(tableName)
|
||||
+ " t WHERE 1=0";
|
||||
|
||||
LOG.debug("Using column names query: " + query);
|
||||
return query;
|
||||
}
|
||||
|
||||
/**
|
||||
@ -652,12 +658,23 @@ public String[] getColumnNames(String tableName) {
|
||||
ResultSet rset = null;
|
||||
List<String> columns = new ArrayList<String>();
|
||||
|
||||
String tableOwner = this.options.getUsername();
|
||||
String shortTableName = tableName;
|
||||
int qualifierIndex = tableName.indexOf('.');
|
||||
if (qualifierIndex != -1) {
|
||||
tableOwner = tableName.substring(0, qualifierIndex);
|
||||
shortTableName = tableName.substring(qualifierIndex + 1);
|
||||
}
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
|
||||
pStmt = conn.prepareStatement(QUERY_COLUMNS_FOR_TABLE,
|
||||
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
pStmt.setString(1, tableName);
|
||||
|
||||
pStmt.setString(1, tableOwner);
|
||||
|
||||
pStmt.setString(2, shortTableName);
|
||||
rset = pStmt.executeQuery();
|
||||
|
||||
while (rset.next()) {
|
||||
@ -704,12 +721,21 @@ public String getPrimaryKey(String tableName) {
|
||||
ResultSet rset = null;
|
||||
List<String> columns = new ArrayList<String>();
|
||||
|
||||
String tableOwner = this.options.getUsername();
|
||||
String shortTableName = tableName;
|
||||
int qualifierIndex = tableName.indexOf('.');
|
||||
if (qualifierIndex != -1) {
|
||||
tableOwner = tableName.substring(0, qualifierIndex);
|
||||
shortTableName = tableName.substring(qualifierIndex + 1);
|
||||
}
|
||||
|
||||
try {
|
||||
conn = getConnection();
|
||||
|
||||
pStmt = conn.prepareStatement(QUERY_PRIMARY_KEY_FOR_TABLE,
|
||||
ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY);
|
||||
pStmt.setString(1, tableName);
|
||||
pStmt.setString(1, shortTableName);
|
||||
pStmt.setString(2, tableOwner);
|
||||
rset = pStmt.executeQuery();
|
||||
|
||||
while (rset.next()) {
|
||||
|
@ -63,10 +63,22 @@
|
||||
* into Apache's tree for licensing reasons).
|
||||
*
|
||||
* To set up your test environment:
|
||||
* Install Oracle Express Edition 10.2.0.
|
||||
* Create a database user named SQOOPTEST
|
||||
* Set the user's password to '12345'
|
||||
* Grant the user the CREATE TABLE privilege
|
||||
* <ol>
|
||||
* <li>Install Oracle 10.2.0 or later</li>
|
||||
* <li>Create a database user named SQOOPTEST with password '12345' having
|
||||
* CONNECT and RESOURCE privileges.</li>
|
||||
* <li>Create a database user named SQOOPTEST2 with password 'abcdef' having
|
||||
* CONNECT and RESOURCE privileges</li>
|
||||
* </ol>
|
||||
*
|
||||
* One way to do this is to connect to the database instance via SQL*Plus client
|
||||
* as the SYSTEM user and execute the following commands:
|
||||
* <ul>
|
||||
* <li>CREATE USER SQOOPTEST identified by 12345;</li>
|
||||
* <li>GRANT CONNECT, RESOURCE to SQOOPTEST;</li>
|
||||
* <li>CREATE USER SQOOPTEST2 identified by ABCDEF;</li>
|
||||
* <li>GRANT CONNECT, RESOURCE to SQOOPTEST2;</li>
|
||||
* </ul>
|
||||
*
|
||||
* Oracle XE does a poor job of cleaning up connections in a timely fashion.
|
||||
* Too many connections too quickly will be rejected, because XE will gc the
|
||||
@ -86,6 +98,62 @@ public class OracleManagerTest extends ImportJobTestCase {
|
||||
OracleManagerTest.class.getName());
|
||||
|
||||
static final String TABLE_NAME = "EMPLOYEES";
|
||||
static final String SECONDARY_TABLE_NAME = "CUSTOMER";
|
||||
static final String QUALIFIED_SECONDARY_TABLE_NAME =
|
||||
OracleUtils.ORACLE_SECONDARY_USER_NAME + "." + SECONDARY_TABLE_NAME;
|
||||
|
||||
/*
|
||||
* Array containing SQL statements necessary to create and populate
|
||||
* the main test table.
|
||||
*/
|
||||
private static final String[] MAIN_TABLE_SQL_STMTS = new String[] {
|
||||
"CREATE TABLE " + TABLE_NAME + " ("
|
||||
+ "id INT NOT NULL, "
|
||||
+ "name VARCHAR2(24) NOT NULL, "
|
||||
+ "start_date DATE, "
|
||||
+ "salary FLOAT, "
|
||||
+ "dept VARCHAR2(32), "
|
||||
+ "timestamp_tz TIMESTAMP WITH TIME ZONE, "
|
||||
+ "timestamp_ltz TIMESTAMP WITH LOCAL TIME ZONE, "
|
||||
+ "PRIMARY KEY (id))",
|
||||
"INSERT INTO " + TABLE_NAME + " VALUES("
|
||||
+ "1,'Aaron',to_date('2009-05-14','yyyy-mm-dd'),"
|
||||
+ "1000000.00,'engineering','29-DEC-09 12.00.00.000000000 PM',"
|
||||
+ "'29-DEC-09 12.00.00.000000000 PM')",
|
||||
"INSERT INTO " + TABLE_NAME + " VALUES("
|
||||
+ "2,'Bob',to_date('2009-04-20','yyyy-mm-dd'),"
|
||||
+ "400.00,'sales','30-DEC-09 12.00.00.000000000 PM',"
|
||||
+ "'30-DEC-09 12.00.00.000000000 PM')",
|
||||
"INSERT INTO " + TABLE_NAME + " VALUES("
|
||||
+ "3,'Fred',to_date('2009-01-23','yyyy-mm-dd'),15.00,"
|
||||
+ "'marketing','31-DEC-09 12.00.00.000000000 PM',"
|
||||
+ "'31-DEC-09 12.00.00.000000000 PM')",
|
||||
};
|
||||
|
||||
/*
|
||||
* Array containing SQL statements necessary to create, populate and
|
||||
* provision the secondary test table.
|
||||
*/
|
||||
private static final String[] SECONDARY_TABLE_SQL_STMTS = new String[] {
|
||||
"CREATE TABLE " + SECONDARY_TABLE_NAME + " ("
|
||||
+ "id INT NOT NULL, "
|
||||
+ "name VARCHAR2(24) NOT NULL, "
|
||||
+ "PRIMARY KEY (id))",
|
||||
"INSERT INTO " + SECONDARY_TABLE_NAME + " VALUES("
|
||||
+ "1,'MercuryCorp')",
|
||||
"INSERT INTO " + SECONDARY_TABLE_NAME + " VALUES("
|
||||
+ "2,'VenusCorp')",
|
||||
"INSERT INTO " + SECONDARY_TABLE_NAME + " VALUES("
|
||||
+ "3,'EarthCorp')",
|
||||
"INSERT INTO " + SECONDARY_TABLE_NAME + " VALUES("
|
||||
+ "4,'MarsCorp')",
|
||||
"INSERT INTO " + SECONDARY_TABLE_NAME + " VALUES("
|
||||
+ "5,'JupiterCorp')",
|
||||
"INSERT INTO " + SECONDARY_TABLE_NAME + " VALUES("
|
||||
+ "6,'SaturnCorp')",
|
||||
"GRANT SELECT, INSERT ON " + SECONDARY_TABLE_NAME + " TO "
|
||||
+ OracleUtils.ORACLE_USER_NAME,
|
||||
};
|
||||
|
||||
// instance variables populated during setUp, used during tests
|
||||
private OracleManager manager;
|
||||
@ -95,55 +163,18 @@ protected boolean useHsqldbTestServer() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
super.setUp();
|
||||
|
||||
SqoopOptions options = new SqoopOptions(OracleUtils.CONNECT_STRING,
|
||||
TABLE_NAME);
|
||||
OracleUtils.setOracleAuth(options);
|
||||
|
||||
manager = new OracleManager(options);
|
||||
options.getConf().set("oracle.sessionTimeZone", "US/Pacific");
|
||||
|
||||
// Drop the existing table, if there is one.
|
||||
try {
|
||||
OracleUtils.dropTable(TABLE_NAME, manager);
|
||||
} catch (SQLException sqlE) {
|
||||
fail("Could not drop table " + TABLE_NAME + ": " + sqlE);
|
||||
}
|
||||
|
||||
private void executeUpdates(OracleManager mgr, String[] sqlStmts) {
|
||||
Connection connection = null;
|
||||
Statement st = null;
|
||||
|
||||
try {
|
||||
connection = manager.getConnection();
|
||||
connection = mgr.getConnection();
|
||||
connection.setAutoCommit(false);
|
||||
st = connection.createStatement();
|
||||
|
||||
// create the database table and populate it with data.
|
||||
st.executeUpdate("CREATE TABLE " + TABLE_NAME + " ("
|
||||
+ "id INT NOT NULL, "
|
||||
+ "name VARCHAR2(24) NOT NULL, "
|
||||
+ "start_date DATE, "
|
||||
+ "salary FLOAT, "
|
||||
+ "dept VARCHAR2(32), "
|
||||
+ "timestamp_tz TIMESTAMP WITH TIME ZONE, "
|
||||
+ "timestamp_ltz TIMESTAMP WITH LOCAL TIME ZONE, "
|
||||
+ "PRIMARY KEY (id))");
|
||||
|
||||
st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
|
||||
+ "1,'Aaron',to_date('2009-05-14','yyyy-mm-dd'),"
|
||||
+ "1000000.00,'engineering','29-DEC-09 12.00.00.000000000 PM',"
|
||||
+ "'29-DEC-09 12.00.00.000000000 PM')");
|
||||
st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
|
||||
+ "2,'Bob',to_date('2009-04-20','yyyy-mm-dd'),"
|
||||
+ "400.00,'sales','30-DEC-09 12.00.00.000000000 PM',"
|
||||
+ "'30-DEC-09 12.00.00.000000000 PM')");
|
||||
st.executeUpdate("INSERT INTO " + TABLE_NAME + " VALUES("
|
||||
+ "3,'Fred',to_date('2009-01-23','yyyy-mm-dd'),15.00,"
|
||||
+ "'marketing','31-DEC-09 12.00.00.000000000 PM',"
|
||||
+ "'31-DEC-09 12.00.00.000000000 PM')");
|
||||
for (String sql : sqlStmts) {
|
||||
st.executeUpdate(sql);
|
||||
}
|
||||
connection.commit();
|
||||
} catch (SQLException sqlE) {
|
||||
LOG.error("Encountered SQL Exception: " + sqlE);
|
||||
@ -164,6 +195,53 @@ public void setUp() {
|
||||
}
|
||||
}
|
||||
|
||||
private void provisionSecondaryTable() {
|
||||
SqoopOptions options = new SqoopOptions(OracleUtils.CONNECT_STRING,
|
||||
SECONDARY_TABLE_NAME);
|
||||
OracleUtils.setOracleSecondaryUserAuth(options);
|
||||
|
||||
OracleManager mgr = new OracleManager(options);
|
||||
|
||||
// Drop the existing table if there is one
|
||||
try {
|
||||
OracleUtils.dropTable(SECONDARY_TABLE_NAME, mgr);
|
||||
} catch (SQLException sqlE) {
|
||||
fail("Could not drop table " + SECONDARY_TABLE_NAME + ": " + sqlE);
|
||||
}
|
||||
|
||||
executeUpdates(mgr, SECONDARY_TABLE_SQL_STMTS);
|
||||
|
||||
try {
|
||||
mgr.close();
|
||||
} catch (SQLException sqlE) {
|
||||
fail("Failed to close secondary manager instance : " + sqlE);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
@Before
|
||||
public void setUp() {
|
||||
super.setUp();
|
||||
|
||||
provisionSecondaryTable();
|
||||
|
||||
SqoopOptions options = new SqoopOptions(OracleUtils.CONNECT_STRING,
|
||||
TABLE_NAME);
|
||||
OracleUtils.setOracleAuth(options);
|
||||
|
||||
manager = new OracleManager(options);
|
||||
options.getConf().set("oracle.sessionTimeZone", "US/Pacific");
|
||||
|
||||
// Drop the existing table, if there is one.
|
||||
try {
|
||||
OracleUtils.dropTable(TABLE_NAME, manager);
|
||||
} catch (SQLException sqlE) {
|
||||
fail("Could not drop table " + TABLE_NAME + ": " + sqlE);
|
||||
}
|
||||
|
||||
executeUpdates(manager, MAIN_TABLE_SQL_STMTS);
|
||||
}
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
super.tearDown();
|
||||
@ -175,7 +253,11 @@ public void tearDown() {
|
||||
}
|
||||
}
|
||||
|
||||
private String [] getArgv() {
|
||||
private String[] getArgv() {
|
||||
return getArgv(TABLE_NAME);
|
||||
}
|
||||
|
||||
private String [] getArgv(String tableName) {
|
||||
ArrayList<String> args = new ArrayList<String>();
|
||||
|
||||
CommonArgs.addHadoopFlags(args);
|
||||
@ -184,7 +266,7 @@ public void tearDown() {
|
||||
args.add("oracle.sessionTimeZone=US/Pacific");
|
||||
|
||||
args.add("--table");
|
||||
args.add(TABLE_NAME);
|
||||
args.add(tableName);
|
||||
args.add("--warehouse-dir");
|
||||
args.add(getWarehouseDir());
|
||||
args.add("--connect");
|
||||
@ -199,6 +281,46 @@ public void tearDown() {
|
||||
return args.toArray(new String[0]);
|
||||
}
|
||||
|
||||
private void runSecondaryTableTest(String [] expectedResults)
|
||||
throws IOException {
|
||||
Path warehousePath = new Path(this.getWarehouseDir());
|
||||
Path tablePath = new Path(warehousePath, QUALIFIED_SECONDARY_TABLE_NAME);
|
||||
Path filePath = new Path(tablePath, "part-m-00000");
|
||||
|
||||
File tableFile = new File(tablePath.toString());
|
||||
if (tableFile.exists() && tableFile.isDirectory()) {
|
||||
// remove the directory before running the import
|
||||
FileListing.recursiveDeleteDir(tableFile);
|
||||
}
|
||||
|
||||
String [] argv = getArgv(QUALIFIED_SECONDARY_TABLE_NAME);
|
||||
|
||||
try {
|
||||
runImport(argv);
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Got IOException during import: " + ioe.toString());
|
||||
ioe.printStackTrace();
|
||||
fail(ioe.toString());
|
||||
}
|
||||
|
||||
File f = new File(filePath.toString());
|
||||
assertTrue("Could not find imported data file", f.exists());
|
||||
BufferedReader r = null;
|
||||
try {
|
||||
// Read through the file and make sure it's all there.
|
||||
r = new BufferedReader(new InputStreamReader(new FileInputStream(f)));
|
||||
for (String expectedLine : expectedResults) {
|
||||
compareRecords(expectedLine, r.readLine());
|
||||
}
|
||||
} catch (IOException ioe) {
|
||||
LOG.error("Got IOException verifying results: " + ioe.toString());
|
||||
ioe.printStackTrace();
|
||||
fail(ioe.toString());
|
||||
} finally {
|
||||
IOUtils.closeStream(r);
|
||||
}
|
||||
}
|
||||
|
||||
private void runOracleTest(String [] expectedResults) throws IOException {
|
||||
|
||||
Path warehousePath = new Path(this.getWarehouseDir());
|
||||
@ -259,6 +381,20 @@ public void testOracleImport() throws IOException {
|
||||
runOracleTest(expectedResults);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecondaryTableImport() throws IOException {
|
||||
|
||||
String [] expectedResults = {
|
||||
"1,MercuryCorp",
|
||||
"2,VenusCorp",
|
||||
"3,EarthCorp",
|
||||
"4,MarsCorp",
|
||||
"5,JupiterCorp",
|
||||
"6,SaturnCorp",
|
||||
};
|
||||
runSecondaryTableTest(expectedResults);
|
||||
}
|
||||
|
||||
/**
|
||||
* Compare two lines. Normalize the dates we receive based on the expected
|
||||
* time zone.
|
||||
|
@ -43,6 +43,9 @@ public final class OracleUtils {
|
||||
public static final String ORACLE_USER_NAME = "SQOOPTEST";
|
||||
public static final String ORACLE_USER_PASS = "12345";
|
||||
|
||||
public static final String ORACLE_SECONDARY_USER_NAME = "SQOOPTEST2";
|
||||
public static final String ORACLE_SECONDARY_USER_PASS = "ABCDEF";
|
||||
|
||||
private OracleUtils() { }
|
||||
|
||||
public static void setOracleAuth(SqoopOptions options) {
|
||||
@ -50,6 +53,11 @@ public static void setOracleAuth(SqoopOptions options) {
|
||||
options.setPassword(ORACLE_USER_PASS);
|
||||
}
|
||||
|
||||
public static void setOracleSecondaryUserAuth(SqoopOptions options) {
|
||||
options.setUsername(ORACLE_SECONDARY_USER_NAME);
|
||||
options.setPassword(ORACLE_SECONDARY_USER_PASS);
|
||||
}
|
||||
|
||||
/**
|
||||
* Drop a table if it exists.
|
||||
*/
|
||||
|
Loading…
Reference in New Issue
Block a user