From 041d42fd552f3cb0accff08b3a1ac180ff165c29 Mon Sep 17 00:00:00 2001 From: Gwen Shapira Date: Fri, 23 Jan 2015 10:03:11 -0800 Subject: [PATCH] SQOOP-1572: Sqoop2: Duplicate Column Name in Multiple Tables Import (Abraham Elmahrek via Gwen Shapira) --- .../jdbc/GenericJdbcFromInitializer.java | 4 +- docs/src/site/sphinx/Connectors.rst | 3 +- .../jdbc/generic/FromRDBMSToHDFSTest.java | 98 +++++++++++++++++++ 3 files changed, 102 insertions(+), 3 deletions(-) diff --git a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java index ff42949f..425b2cc0 100644 --- a/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java +++ b/connector/connector-generic-jdbc/src/main/java/org/apache/sqoop/connector/jdbc/GenericJdbcFromInitializer.java @@ -85,9 +85,9 @@ public Schema getSchema(InitializerContext context, LinkConfiguration linkConfig rsmt = rs.getMetaData(); for (int i = 1 ; i <= rsmt.getColumnCount(); i++) { - String columnName = rsmt.getColumnName(i); + String columnName = rsmt.getColumnLabel(i); if (StringUtils.isEmpty(columnName)) { - columnName = rsmt.getColumnLabel(i); + columnName = rsmt.getColumnName(i); if (StringUtils.isEmpty(columnName)) { columnName = "Column " + i; } diff --git a/docs/src/site/sphinx/Connectors.rst b/docs/src/site/sphinx/Connectors.rst index bcc5b43a..f67e187d 100644 --- a/docs/src/site/sphinx/Connectors.rst +++ b/docs/src/site/sphinx/Connectors.rst @@ -74,7 +74,7 @@ Inputs associated with the Job configuration for the FROM direction include: | | | *Optional*. See note below. | | +-----------------------------+---------+-------------------------------------------------------------------------+---------------------------------------------+ | Table SQL statement | String | The SQL statement used to perform a **free form query**. | ``SELECT COUNT(*) FROM test ${CONDITIONS}`` | -| | | *Optional*. See note below. | | +| | | *Optional*. See notes below. | | +-----------------------------+---------+-------------------------------------------------------------------------+---------------------------------------------+ | Table column names | String | Columns to extract from the JDBC data source. | col1,col2 | | | | *Optional* Comma separated list of columns. | | @@ -94,6 +94,7 @@ Inputs associated with the Job configuration for the FROM direction include: 1. *Table name* and *Table SQL statement* are mutually exclusive. If *Table name* is provided, the *Table SQL statement* should not be provided. If *Table SQL statement* is provided then *Table name* should not be provided. 2. *Table column names* should be provided only if *Table name* is provided. +3. If there are columns with similar names, column aliases are required. For example: ``SELECT table1.id as "i", table2.id as "j" FROM table1 INNER JOIN table2 ON table1.id = table2.id``. **TO Job Configuration** ++++++++++++++++++++++++ diff --git a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java index 85b9d2de..aa9f212e 100644 --- a/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java +++ b/test/src/test/java/org/apache/sqoop/integration/connector/jdbc/generic/FromRDBMSToHDFSTest.java @@ -122,4 +122,102 @@ public void testColumns() throws Exception { // Clean up testing table dropTable(); } + + @Test + public void testSql() throws Exception { + createAndLoadTableCities(); + + // RDBMS link + MLink rdbmsLink = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsLink); + saveLink(rdbmsLink); + + // HDFS link + MLink hdfsLink = getClient().createLink("hdfs-connector"); + saveLink(hdfsLink); + + // Job creation + MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId()); + + // Connector values + MConfigList configs = job.getJobConfig(Direction.FROM); + configs.getStringInput("fromJobConfig.sql").setValue("SELECT " + provider.escapeColumnName("id") + + " FROM " + provider.escapeTableName(getTableName()) + " WHERE ${CONDITIONS}"); + configs.getStringInput("fromJobConfig.partitionColumn").setValue(provider.escapeColumnName("id")); + fillHdfsToConfig(job, ToFormat.TEXT_FILE); + saveJob(job); + + MSubmission submission = getClient().startJob(job.getPersistenceId()); + assertTrue(submission.getStatus().isRunning()); + + // Wait until the job finish - this active waiting will be removed once + // Sqoop client API will get blocking support. + do { + Thread.sleep(5000); + submission = getClient().getJobStatus(job.getPersistenceId()); + } while(submission.getStatus().isRunning()); + + // Assert correct output + assertTo( + "1", + "2", + "3", + "4" + ); + + // Clean up testing table + dropTable(); + } + + @Test + public void testDuplicateColumns() throws Exception { + createAndLoadTableCities(); + + // RDBMS link + MLink rdbmsLink = getClient().createLink("generic-jdbc-connector"); + fillRdbmsLinkConfig(rdbmsLink); + saveLink(rdbmsLink); + + // HDFS link + MLink hdfsLink = getClient().createLink("hdfs-connector"); + saveLink(hdfsLink); + + // Job creation + MJob job = getClient().createJob(rdbmsLink.getPersistenceId(), hdfsLink.getPersistenceId()); + + // Connector values + String partitionColumn = provider.escapeTableName(getTableName()) + "." + provider.escapeColumnName("id"); + MConfigList configs = job.getJobConfig(Direction.FROM); + configs.getStringInput("fromJobConfig.sql").setValue( + "SELECT " + provider.escapeColumnName("id") + " as " + provider.escapeColumnName("i") + ", " + + provider.escapeColumnName("id") + " as " + provider.escapeColumnName("j") + + " FROM " + provider.escapeTableName(getTableName()) + " WHERE ${CONDITIONS}"); + configs.getStringInput("fromJobConfig.partitionColumn").setValue(partitionColumn); + configs.getStringInput("fromJobConfig.boundaryQuery").setValue( + "SELECT MIN(" + partitionColumn + "), MAX(" + partitionColumn + ") FROM " + + provider.escapeTableName(getTableName())); + fillHdfsToConfig(job, ToFormat.TEXT_FILE); + saveJob(job); + + MSubmission submission = getClient().startJob(job.getPersistenceId()); + assertTrue(submission.getStatus().isRunning()); + + // Wait until the job finish - this active waiting will be removed once + // Sqoop client API will get blocking support. + do { + Thread.sleep(5000); + submission = getClient().getJobStatus(job.getPersistenceId()); + } while(submission.getStatus().isRunning()); + + // Assert correct output + assertTo( + "1,1", + "2,2", + "3,3", + "4,4" + ); + + // Clean up testing table + dropTable(); + } }