diff --git a/src/java/org/apache/sqoop/manager/oracle/OraOopDataDrivenDBInputFormat.java b/src/java/org/apache/sqoop/manager/oracle/OraOopDataDrivenDBInputFormat.java index 13f05d52..3e88d049 100644 --- a/src/java/org/apache/sqoop/manager/oracle/OraOopDataDrivenDBInputFormat.java +++ b/src/java/org/apache/sqoop/manager/oracle/OraOopDataDrivenDBInputFormat.java @@ -136,8 +136,16 @@ public List getSplits(JobContext jobContext) throws IOException { } } + connection.commit(); } catch (SQLException ex) { + try { + connection.rollback(); + } catch (SQLException e) { + LOG.error("Cannot rollback transaction.", e); + } throw new IOException(ex); + } finally { + closeConnection(); } return splits; @@ -199,13 +207,11 @@ protected RecordReader createDBRecordReader( // the current // value of the URL_PROPERTY... - return new OraOopDBRecordReader(split, inputClass, conf, dbConf - .getConnection(), dbConf, dbConf.getInputConditions(), dbConf + return new OraOopDBRecordReader(split, inputClass, conf, + getConnection(), dbConf, dbConf.getInputConditions(), dbConf .getInputFieldNames(), dbConf.getInputTableName()); } catch (SQLException ex) { throw new IOException(ex); - } catch (ClassNotFoundException ex) { - throw new IOException(ex); } } diff --git a/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java b/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java index 3a8e5d08..0a2e3961 100644 --- a/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java +++ b/src/java/org/apache/sqoop/mapreduce/db/DBInputFormat.java @@ -158,8 +158,11 @@ public void write(DataOutput output) throws IOException { @Override /** {@inheritDoc} */ public void setConf(Configuration conf) { + setDbConf(new DBConfiguration(conf)); + } - dbConf = new DBConfiguration(conf); + public void setDbConf(DBConfiguration dbConf) { + this.dbConf = dbConf; try { getConnection(); @@ -389,7 +392,9 @@ protected void closeConnection() { this.connection.close(); this.connection = null; } - } catch (SQLException sqlE) { /* ignore exception on close. */ } + } catch (SQLException sqlE) { + LOG.error("Cannot close JDBC connection.", sqlE); + } } } diff --git a/src/test/com/cloudera/sqoop/ThirdPartyTests.java b/src/test/com/cloudera/sqoop/ThirdPartyTests.java index 50f31929..3103bd4d 100644 --- a/src/test/com/cloudera/sqoop/ThirdPartyTests.java +++ b/src/test/com/cloudera/sqoop/ThirdPartyTests.java @@ -55,6 +55,7 @@ import org.apache.sqoop.manager.netezza.DirectNetezzaHCatImportManualTest; import org.apache.sqoop.manager.netezza.NetezzaExportManualTest; import org.apache.sqoop.manager.netezza.NetezzaImportManualTest; +import org.apache.sqoop.manager.oracle.OraOopDataDrivenDBInputFormatConnectionCloseTest; import org.apache.sqoop.manager.oracle.OracleCallExportTest; import org.apache.sqoop.manager.oracle.OracleIncrementalImportTest; import org.apache.sqoop.manager.oracle.OracleSplitterTest; @@ -98,6 +99,7 @@ public static Test suite() { suite.addTestSuite(OracleCompatTest.class); suite.addTestSuite(OracleIncrementalImportTest.class); suite.addTestSuite(OracleSplitterTest.class); + suite.addTestSuite(OraOopDataDrivenDBInputFormatConnectionCloseTest.class); // SQL Server suite.addTestSuite(SQLServerDatatypeExportDelimitedFileManualTest.class); diff --git a/src/test/org/apache/sqoop/manager/oracle/OraOopDataDrivenDBInputFormatConnectionCloseTest.java b/src/test/org/apache/sqoop/manager/oracle/OraOopDataDrivenDBInputFormatConnectionCloseTest.java new file mode 100644 index 00000000..78451608 --- /dev/null +++ b/src/test/org/apache/sqoop/manager/oracle/OraOopDataDrivenDBInputFormatConnectionCloseTest.java @@ -0,0 +1,91 @@ +package org.apache.sqoop.manager.oracle; + +import com.cloudera.sqoop.mapreduce.db.DBConfiguration; +import junit.framework.TestCase; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.sql.Connection; +import java.sql.DatabaseMetaData; +import java.sql.PreparedStatement; +import java.sql.ResultSet; +import java.sql.SQLException; + +import static org.mockito.Matchers.anyString; +import static org.mockito.Mockito.doThrow; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; +import static org.mockito.Mockito.when; + +public class OraOopDataDrivenDBInputFormatConnectionCloseTest extends TestCase { + + private static final OraOopLog LOG = OraOopLogFactory.getLog( + TestOraOopDataDrivenDBInputFormat.class.getName()); + + private static final String ORACLE_PREPARED_STATEMENT_CLASS = "oracle.jdbc.OraclePreparedStatement"; + + private OraOopDataDrivenDBInputFormat inputFormat; + + private Connection mockConnection; + + private JobContext mockJobContext; + + @Before + public void setUp() throws Exception { + Configuration configuration = new Configuration(); + configuration.set(DBConfiguration.USERNAME_PROPERTY, "Oracle user"); + configuration.setInt(OraOopConstants.ORAOOP_DESIRED_NUMBER_OF_MAPPERS, 1); + + Class preparedStatementClass = + (Class) Class.forName(ORACLE_PREPARED_STATEMENT_CLASS); + PreparedStatement mockPreparedStatement = mock(preparedStatementClass); + ResultSet mockResultSet = mock(ResultSet.class); + when(mockResultSet.next()).thenReturn(true).thenReturn(false); + when(mockPreparedStatement.executeQuery()).thenReturn(mockResultSet); + + mockConnection = mock(Connection.class); + DatabaseMetaData dbMetaData = mock(DatabaseMetaData.class); + when(dbMetaData.getDatabaseProductName()).thenReturn("Oracle"); + when(mockConnection.getMetaData()).thenReturn(dbMetaData); + when(mockConnection.prepareStatement(anyString())).thenReturn(mockPreparedStatement); + + DBConfiguration dbConf = mock(DBConfiguration.class); + when(dbConf.getConnection()).thenReturn(mockConnection); + when(dbConf.getConf()).thenReturn(configuration); + when(dbConf.getInputTableName()).thenReturn("InputTable"); + + mockJobContext = mock(JobContext.class); + when(mockJobContext.getConfiguration()).thenReturn(configuration); + + inputFormat = new OraOopDataDrivenDBInputFormat(); + inputFormat.setDbConf(dbConf); + } + + @Test + public void testGetSplitsClosesConnectionProperly() throws Exception { + inputFormat.getSplits(mockJobContext); + verify(mockConnection).commit(); + verify(mockConnection).close(); + } + + @Test + public void testGetSplitsClosesConnectionProperlyWhenExceptionIsThrown() throws Exception { + + doThrow(new SQLException("For the sake of testing the commit fails.")).when(mockConnection).commit(); + + try { + inputFormat.getSplits(mockJobContext); + } catch (IOException e) { + LOG.debug("An expected exception is thrown in testSplitsClosesConnectionProperlyWhenExceptionIsThrown, ignoring."); + } + + verify(mockConnection).rollback(); + verify(mockConnection).close(); + + } + + +}