5
0
mirror of https://github.com/apache/sqoop.git synced 2025-05-04 12:52:24 +08:00

SQOOP-1572: Sqoop2: Duplicate Column Name in Multiple Tables Import

(Abraham Elmahrek via Gwen Shapira)
This commit is contained in:
Gwen Shapira 2015-01-23 10:03:11 -08:00
parent d95e7537a3
commit 041d42fd55
3 changed files with 102 additions and 3 deletions

View File

@ -85,9 +85,9 @@ public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig
rsmt = rs.getMetaData();
for (int i = 1 ; i <= rsmt.getColumnCount(); i++) {
String columnName = rsmt.getColumnName(i);
String columnName = rsmt.getColumnLabel(i);
if (StringUtils.isEmpty(columnName)) {
columnName = rsmt.getColumnLabel(i);
columnName = rsmt.getColumnName(i);
if (StringUtils.isEmpty(columnName)) {
columnName = "Column " + i;
}

View File

@ -74,7 +74,7 @@ Inputs associated with the Job configuration for the FROM direction include:
| | | *Optional*. See note below. | |
+-----------------------------+---------+-------------------------------------------------------------------------+---------------------------------------------+
| Table SQL statement | String | The SQL statement used to perform a **free form query**. | ``SELECT COUNT(*) FROM test ${CONDITIONS}`` |
| | | *Optional*. See note below. | |
| | | *Optional*. See notes below. | |
+-----------------------------+---------+-------------------------------------------------------------------------+---------------------------------------------+
| Table column names | String | Columns to extract from the JDBC data source. | col1,col2 |
| | | *Optional* Comma separated list of columns. | |
@ -94,6 +94,7 @@ Inputs associated with the Job configuration for the FROM direction include:
1. *Table name* and *Table SQL statement* are mutually exclusive. If *Table name* is provided, the *Table SQL statement* should not be provided. If *Table SQL statement* is provided then *Table name* should not be provided.
2. *Table column names* should be provided only if *Table name* is provided.
3. If there are columns with similar names, column aliases are required. For example: ``SELECT table1.id as "i", table2.id as "j" FROM table1 INNER JOIN table2 ON table1.id = table2.id``.
**TO Job Configuration**
++++++++++++++++++++++++

View File

@ -122,4 +122,102 @@ public void testColumns() throws Exception {
// Clean up testing table
dropTable();
}
@Test
public void testSql() throws Exception {
createAndLoadTableCities();
// RDBMS link
MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
fillRdbmsLinkConfig(rdbmsLink);
saveLink(rdbmsLink);
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
saveLink(hdfsLink);
// Job creation
MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
// Connector values
MConfigList configs = job.getJobConfig(Direction.FROM);
configs.getStringInput("fromJobConfig.sql").setValue("SELECT " + provider.escapeColumnName("id")
+ " FROM " + provider.escapeTableName(getTableName()) + " WHERE ${CONDITIONS}");
configs.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id"));
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
saveJob(job);
MSubmission submission = getClient().startJob(job.getPersistenceId());
assertTrue(submission.getStatus().isRunning());
// Wait until the job finish - this active waiting will be removed once
// Sqoop client API will get blocking support.
do {
Thread.sleep(5000);
submission = getClient().getJobStatus(job.getPersistenceId());
} while(submission.getStatus().isRunning());
// Assert correct output
assertTo(
"1",
"2",
"3",
"4"
);
// Clean up testing table
dropTable();
}
@Test
public void testDuplicateColumns() throws Exception {
createAndLoadTableCities();
// RDBMS link
MLink rdbmsLink = getClient().createLink("generic-jdbc-connector");
fillRdbmsLinkConfig(rdbmsLink);
saveLink(rdbmsLink);
// HDFS link
MLink hdfsLink = getClient().createLink("hdfs-connector");
saveLink(hdfsLink);
// Job creation
MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId());
// Connector values
String partitionColumn = provider.escapeTableName(getTableName()) + "." + provider.escapeColumnName("id");
MConfigList configs = job.getJobConfig(Direction.FROM);
configs.getStringInput("fromJobConfig.sql").setValue(
"SELECT " + provider.escapeColumnName("id") + " as " + provider.escapeColumnName("i") + ", "
+ provider.escapeColumnName("id") + " as " + provider.escapeColumnName("j")
+ " FROM " + provider.escapeTableName(getTableName()) + " WHERE ${CONDITIONS}");
configs.getStringInput("fromJobConfig.partitionColumn").setValue(partitionColumn);
configs.getStringInput("fromJobConfig.boundaryQuery").setValue(
"SELECT MIN(" + partitionColumn + "), MAX(" + partitionColumn + ") FROM "
+ provider.escapeTableName(getTableName()));
fillHdfsToConfig(job, ToFormat.TEXT_FILE);
saveJob(job);
MSubmission submission = getClient().startJob(job.getPersistenceId());
assertTrue(submission.getStatus().isRunning());
// Wait until the job finish - this active waiting will be removed once
// Sqoop client API will get blocking support.
do {
Thread.sleep(5000);
submission = getClient().getJobStatus(job.getPersistenceId());
} while(submission.getStatus().isRunning());
// Assert correct output
assertTo(
"1,1",
"2,2",
"3,3",
"4,4"
);
// Clean up testing table
dropTable();
}
}